made requesting best block header retry
[p2pool.git] / p2pool / main.py
1 from __future__ import division
2
3 import ConfigParser
4 import StringIO
5 import base64
6 import json
7 import os
8 import random
9 import sys
10 import time
11 import signal
12 import traceback
13 import urlparse
14
15 if '--iocp' in sys.argv:
16     from twisted.internet import iocpreactor
17     iocpreactor.install()
18 from twisted.internet import defer, reactor, protocol, task
19 from twisted.web import server
20 from twisted.python import log
21 from nattraverso import portmapper, ipdiscover
22
23 import bitcoin.p2p as bitcoin_p2p, bitcoin.data as bitcoin_data
24 from bitcoin import worker_interface, height_tracker
25 from util import expiring_dict, fixargparse, jsonrpc, variable, deferral, math, logging
26 from . import p2p, networks, web, work
27 import p2pool, p2pool.data as p2pool_data
28
29 @deferral.retry('Error getting work from bitcoind:', 3)
30 @defer.inlineCallbacks
31 def getwork(bitcoind):
32     try:
33         work = yield bitcoind.rpc_getmemorypool()
34     except jsonrpc.Error, e:
35         if e.code == -32601: # Method not found
36             print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
37             raise deferral.RetrySilentlyException()
38         raise
39     packed_transactions = [x.decode('hex') for x in work['transactions']]
40     defer.returnValue(dict(
41         version=work['version'],
42         previous_block=int(work['previousblockhash'], 16),
43         transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
44         merkle_link=bitcoin_data.calculate_merkle_link([None] + map(bitcoin_data.hash256, packed_transactions), 0),
45         subsidy=work['coinbasevalue'],
46         time=work['time'],
47         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
48         coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
49         clock_offset=time.time() - work['time'],
50         last_update=time.time(),
51     ))
52
53 @defer.inlineCallbacks
54 def main(args, net, datadir_path, merged_urls, worker_endpoint):
55     try:
56         print 'p2pool (version %s)' % (p2pool.__version__,)
57         print
58         
59         # connect to bitcoind over JSON-RPC and do initial getmemorypool
60         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
61         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
62         bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
63         @deferral.retry('Error while checking Bitcoin connection:', 1)
64         @defer.inlineCallbacks
65         def check():
66             if not (yield net.PARENT.RPC_CHECK(bitcoind)):
67                 print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
68                 raise deferral.RetrySilentlyException()
69             temp_work = yield getwork(bitcoind)
70             if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version'], temp_work):
71                 print >>sys.stderr, '    Bitcoin version too old! BIP16 support required! Upgrade to 0.6.0rc4 or greater!'
72                 raise deferral.RetrySilentlyException()
73             defer.returnValue(temp_work)
74         temp_work = yield check()
75         
76         block_height_var = variable.Variable(None)
77         @defer.inlineCallbacks
78         def poll_height():
79             block_height_var.set((yield deferral.retry('Error while calling getblockcount:')(bitcoind.rpc_getblockcount)()))
80         yield poll_height()
81         task.LoopingCall(poll_height).start(60*60)
82         
83         bitcoind_warning_var = variable.Variable(None)
84         @defer.inlineCallbacks
85         def poll_warnings():
86             errors = (yield deferral.retry('Error while calling getmininginfo:')(bitcoind.rpc_getmininginfo)())['errors']
87             bitcoind_warning_var.set(errors if errors != '' else None)
88         yield poll_warnings()
89         task.LoopingCall(poll_warnings).start(20*60)
90         
91         print '    ...success!'
92         print '    Current block hash: %x' % (temp_work['previous_block'],)
93         print '    Current block height: %i' % (block_height_var.value,)
94         print
95         
96         # connect to bitcoind over bitcoin-p2p
97         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
98         factory = bitcoin_p2p.ClientFactory(net.PARENT)
99         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
100         yield factory.getProtocol() # waits until handshake is successful
101         print '    ...success!'
102         print
103         
104         print 'Determining payout address...'
105         if args.pubkey_hash is None:
106             address_path = os.path.join(datadir_path, 'cached_payout_address')
107             
108             if os.path.exists(address_path):
109                 with open(address_path, 'rb') as f:
110                     address = f.read().strip('\r\n')
111                 print '    Loaded cached address: %s...' % (address,)
112             else:
113                 address = None
114             
115             if address is not None:
116                 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
117                 if not res['isvalid'] or not res['ismine']:
118                     print '    Cached address is either invalid or not controlled by local bitcoind!'
119                     address = None
120             
121             if address is None:
122                 print '    Getting payout address from bitcoind...'
123                 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
124             
125             with open(address_path, 'wb') as f:
126                 f.write(address)
127             
128             my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
129         else:
130             my_pubkey_hash = args.pubkey_hash
131         print '    ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
132         print
133         
134         my_share_hashes = set()
135         my_doa_share_hashes = set()
136         
137         tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
138         shared_share_hashes = set()
139         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
140         known_verified = set()
141         print "Loading shares..."
142         for i, (mode, contents) in enumerate(ss.get_shares()):
143             if mode == 'share':
144                 if contents.hash in tracker.items:
145                     continue
146                 shared_share_hashes.add(contents.hash)
147                 contents.time_seen = 0
148                 tracker.add(contents)
149                 if len(tracker.items) % 1000 == 0 and tracker.items:
150                     print "    %i" % (len(tracker.items),)
151             elif mode == 'verified_hash':
152                 known_verified.add(contents)
153             else:
154                 raise AssertionError()
155         print "    ...inserting %i verified shares..." % (len(known_verified),)
156         for h in known_verified:
157             if h not in tracker.items:
158                 ss.forget_verified_share(h)
159                 continue
160             tracker.verified.add(tracker.items[h])
161         print "    ...done loading %i shares!" % (len(tracker.items),)
162         print
163         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
164         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
165         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
166         
167         print 'Initializing work...'
168         
169         
170         # BITCOIND WORK
171         
172         bitcoind_work = variable.Variable((yield getwork(bitcoind)))
173         @defer.inlineCallbacks
174         def work_poller():
175             while True:
176                 flag = factory.new_block.get_deferred()
177                 try:
178                     bitcoind_work.set((yield getwork(bitcoind)))
179                 except:
180                     log.err()
181                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
182         work_poller()
183         
184         # PEER WORK
185         
186         best_block_header = variable.Variable(None)
187         def handle_header(new_header):
188             # check that header matches current target
189             if not (net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(new_header)) <= bitcoind_work.value['bits'].target):
190                 return
191             bitcoind_best_block = bitcoind_work.value['previous_block']
192             if (best_block_header.value is None
193                 or (
194                     new_header['previous_block'] == bitcoind_best_block and
195                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)) == bitcoind_best_block
196                 ) # new is child of current and previous is current
197                 or (
198                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(new_header)) == bitcoind_best_block and
199                     best_block_header.value['previous_block'] != bitcoind_best_block
200                 )): # new is current and previous is not a child of current
201                 best_block_header.set(new_header)
202         @defer.inlineCallbacks
203         def poll_header():
204             handle_header((yield factory.conn.value.get_block_header(bitcoind_work.value['previous_block'])))
205         bitcoind_work.changed.watch(lambda _: poll_header())
206         yield deferral.retry('Error while requesting best block header:')(poll_header)()
207         
208         # BEST SHARE
209         
210         get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: bitcoind_work.value['previous_block'], net)
211         requested = expiring_dict.ExpiringDict(300)
212         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
213         
214         best_share_var = variable.Variable(None)
215         def set_best_share():
216             best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'])
217             
218             best_share_var.set(best)
219             
220             t = time.time()
221             for peer2, share_hash in desired:
222                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
223                     continue
224                 last_request_time, count = requested.get(share_hash, (None, 0))
225                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
226                     continue
227                 potential_peers = set()
228                 for head in tracker.tails[share_hash]:
229                     potential_peers.update(peer_heads.get(head, set()))
230                 potential_peers = [peer for peer in potential_peers if peer.connected2]
231                 if count == 0 and peer2 is not None and peer2.connected2:
232                     peer = peer2
233                 else:
234                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
235                     if peer is None:
236                         continue
237                 
238                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
239                 peer.send_getshares(
240                     hashes=[share_hash],
241                     parents=2000,
242                     stops=list(set(tracker.heads) | set(
243                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
244                     ))[:100],
245                 )
246                 requested[share_hash] = t, count + 1
247         bitcoind_work.changed.watch(lambda _: set_best_share())
248         set_best_share()
249         
250         
251         print '    ...success!'
252         print
253         
254         # setup p2p logic and join p2pool network
255         
256         class Node(p2p.Node):
257             def handle_shares(self, shares, peer):
258                 if len(shares) > 5:
259                     print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
260                 
261                 new_count = 0
262                 for share in shares:
263                     if share.hash in tracker.items:
264                         #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
265                         continue
266                     
267                     new_count += 1
268                     
269                     #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
270                     
271                     tracker.add(share)
272                 
273                 if shares and peer is not None:
274                     peer_heads.setdefault(shares[0].hash, set()).add(peer)
275                 
276                 if new_count:
277                     set_best_share()
278                 
279                 if len(shares) > 5:
280                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.items), 2*net.CHAIN_LENGTH)
281             
282             def handle_share_hashes(self, hashes, peer):
283                 t = time.time()
284                 get_hashes = []
285                 for share_hash in hashes:
286                     if share_hash in tracker.items:
287                         continue
288                     last_request_time, count = requested.get(share_hash, (None, 0))
289                     if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
290                         continue
291                     print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
292                     get_hashes.append(share_hash)
293                     requested[share_hash] = t, count + 1
294                 
295                 if hashes and peer is not None:
296                     peer_heads.setdefault(hashes[0], set()).add(peer)
297                 if get_hashes:
298                     peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
299             
300             def handle_get_shares(self, hashes, parents, stops, peer):
301                 parents = min(parents, 1000//len(hashes))
302                 stops = set(stops)
303                 shares = []
304                 for share_hash in hashes:
305                     for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
306                         if share.hash in stops:
307                             break
308                         shares.append(share)
309                 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
310                 return shares
311             
312             def handle_bestblock(self, header, peer):
313                 if net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header)) > header['bits'].target:
314                     raise p2p.PeerMisbehavingError('received block header fails PoW test')
315                 handle_header(header)
316         
317         @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
318         def submit_block_p2p(block):
319             if factory.conn.value is None:
320                 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%32x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, bitcoin_data.hash256(bitcoin_data.block_header_type.pack(block['header'])))
321                 raise deferral.RetrySilentlyException()
322             factory.conn.value.send_block(block=block)
323         
324         @deferral.retry('Error submitting block: (will retry)', 10, 10)
325         @defer.inlineCallbacks
326         def submit_block_rpc(block, ignore_failure):
327             success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
328             success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
329             if (not success and success_expected and not ignore_failure) or (success and not success_expected):
330                 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (success, success_expected)
331         
332         def submit_block(block, ignore_failure):
333             submit_block_p2p(block)
334             submit_block_rpc(block, ignore_failure)
335         
336         @tracker.verified.added.watch
337         def _(share):
338             if share.pow_hash <= share.header['bits'].target:
339                 submit_block(share.as_block(tracker), ignore_failure=True)
340                 print
341                 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
342                 print
343                 def spread():
344                     if (get_height_rel_highest(share.header['previous_block']) > -5 or
345                         bitcoind_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]):
346                         broadcast_share(share.hash)
347                 spread()
348                 reactor.callLater(5, spread) # so get_height_rel_highest can update
349         
350         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
351         
352         @defer.inlineCallbacks
353         def parse(x):
354             if ':' in x:
355                 ip, port = x.split(':')
356                 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
357             else:
358                 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
359         
360         addrs = {}
361         if os.path.exists(os.path.join(datadir_path, 'addrs')):
362             try:
363                 with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
364                     addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
365             except:
366                 print >>sys.stderr, 'error parsing addrs'
367         for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
368             try:
369                 addr = yield addr_df
370                 if addr not in addrs:
371                     addrs[addr] = (0, time.time(), time.time())
372             except:
373                 log.err()
374         
375         connect_addrs = set()
376         for addr_df in map(parse, args.p2pool_nodes):
377             try:
378                 connect_addrs.add((yield addr_df))
379             except:
380                 log.err()
381         
382         p2p_node = Node(
383             best_share_hash_func=lambda: best_share_var.value,
384             port=args.p2pool_port,
385             net=net,
386             addr_store=addrs,
387             connect_addrs=connect_addrs,
388             max_incoming_conns=args.p2pool_conns,
389         )
390         p2p_node.start()
391         
392         def save_addrs():
393             with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
394                 f.write(json.dumps(p2p_node.addr_store.items()))
395         task.LoopingCall(save_addrs).start(60)
396         
397         @best_block_header.changed.watch
398         def _(header):
399             for peer in p2p_node.peers.itervalues():
400                 peer.send_bestblock(header=header)
401         
402         @defer.inlineCallbacks
403         def broadcast_share(share_hash):
404             shares = []
405             for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
406                 if share.hash in shared_share_hashes:
407                     break
408                 shared_share_hashes.add(share.hash)
409                 shares.append(share)
410             
411             for peer in list(p2p_node.peers.itervalues()):
412                 yield peer.sendShares([share for share in shares if share.peer is not peer])
413         
414         # send share when the chain changes to their chain
415         best_share_var.changed.watch(broadcast_share)
416         
417         def save_shares():
418             for share in tracker.get_chain(best_share_var.value, min(tracker.get_height(best_share_var.value), 2*net.CHAIN_LENGTH)):
419                 ss.add_share(share)
420                 if share.hash in tracker.verified.items:
421                     ss.add_verified_hash(share.hash)
422         task.LoopingCall(save_shares).start(60)
423         
424         print '    ...success!'
425         print
426         
427         if args.upnp:
428             @defer.inlineCallbacks
429             def upnp_thread():
430                 while True:
431                     try:
432                         is_lan, lan_ip = yield ipdiscover.get_local_ip()
433                         if is_lan:
434                             pm = yield portmapper.get_port_mapper()
435                             yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
436                     except defer.TimeoutError:
437                         pass
438                     except:
439                         if p2pool.DEBUG:
440                             log.err(None, 'UPnP error:')
441                     yield deferral.sleep(random.expovariate(1/120))
442             upnp_thread()
443         
444         # start listening for workers with a JSON-RPC server
445         
446         print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
447         
448         get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, best_share_var.value, bitcoind_work.value['bits'].target, bitcoind_work.value['subsidy'], net)
449         
450         wb = work.WorkerBridge(my_pubkey_hash, net, args.donation_percentage, bitcoind_work, best_block_header, merged_urls, best_share_var, tracker, my_share_hashes, my_doa_share_hashes, args.worker_fee, p2p_node, submit_block, set_best_share, broadcast_share, block_height_var)
451         web_root = web.get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net, wb.get_stale_counts, my_pubkey_hash, wb.local_rate_monitor, args.worker_fee, p2p_node, wb.my_share_hashes, wb.pseudoshare_received, wb.share_received, best_share_var, bitcoind_warning_var)
452         worker_interface.WorkerInterface(wb).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
453         
454         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
455         
456         with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
457             pass
458         
459         print '    ...success!'
460         print
461         
462         
463         # done!
464         print 'Started successfully!'
465         print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
466         if args.donation_percentage > 0.51:
467             print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
468         elif args.donation_percentage < 0.49:
469             print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
470         else:
471             print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
472             print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
473         print
474         
475         
476         if hasattr(signal, 'SIGALRM'):
477             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
478                 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
479             ))
480             signal.siginterrupt(signal.SIGALRM, False)
481             task.LoopingCall(signal.alarm, 30).start(1)
482         
483         if args.irc_announce:
484             from twisted.words.protocols import irc
485             class IRCClient(irc.IRCClient):
486                 nickname = 'p2pool%02i' % (random.randrange(100),)
487                 channel = net.ANNOUNCE_CHANNEL
488                 def lineReceived(self, line):
489                     if p2pool.DEBUG:
490                         print repr(line)
491                     irc.IRCClient.lineReceived(self, line)
492                 def signedOn(self):
493                     irc.IRCClient.signedOn(self)
494                     self.factory.resetDelay()
495                     self.join(self.channel)
496                     @defer.inlineCallbacks
497                     def new_share(share):
498                         if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
499                             yield deferral.sleep(random.expovariate(1/60))
500                             message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
501                             if message not in self.recent_messages:
502                                 self.say(self.channel, message)
503                                 self._remember_message(message)
504                     self.watch_id = tracker.verified.added.watch(new_share)
505                     self.recent_messages = []
506                 def _remember_message(self, message):
507                     self.recent_messages.append(message)
508                     while len(self.recent_messages) > 100:
509                         self.recent_messages.pop(0)
510                 def privmsg(self, user, channel, message):
511                     if channel == self.channel:
512                         self._remember_message(message)
513                 def connectionLost(self, reason):
514                     tracker.verified.added.unwatch(self.watch_id)
515                     print 'IRC connection lost:', reason.getErrorMessage()
516             class IRCClientFactory(protocol.ReconnectingClientFactory):
517                 protocol = IRCClient
518             reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
519         
520         @defer.inlineCallbacks
521         def status_thread():
522             last_str = None
523             last_time = 0
524             while True:
525                 yield deferral.sleep(3)
526                 try:
527                     if time.time() > bitcoind_work.value['last_update'] + 60:
528                         print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - bitcoind_work.value['last_update']),)
529                     
530                     height = tracker.get_height(best_share_var.value)
531                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
532                         height,
533                         len(tracker.verified.items),
534                         len(tracker.items),
535                         len(p2p_node.peers),
536                         sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
537                     ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
538                     
539                     datums, dt = wb.local_rate_monitor.get_datums_in_last()
540                     my_att_s = sum(datum['work']/dt for datum in datums)
541                     this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
542                         math.format(int(my_att_s)),
543                         math.format_dt(dt),
544                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
545                         math.format_dt(2**256 / tracker.items[best_share_var.value].max_target / my_att_s) if my_att_s and best_share_var.value else '???',
546                     )
547                     
548                     if height > 2:
549                         (stale_orphan_shares, stale_doa_shares), shares, _ = wb.get_stale_counts()
550                         stale_prop = p2pool_data.get_average_stale_prop(tracker, best_share_var.value, min(60*60//net.SHARE_PERIOD, height))
551                         real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, best_share_var.value, min(height - 1, 60*60//net.SHARE_PERIOD)) / (1 - stale_prop)
552                         
553                         this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
554                             shares, stale_orphan_shares, stale_doa_shares,
555                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
556                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
557                             get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
558                         )
559                         this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
560                             math.format(int(real_att_s)),
561                             100*stale_prop,
562                             math.format_dt(2**256 / bitcoind_work.value['bits'].target / real_att_s),
563                         )
564                         
565                         for warning in p2pool_data.get_warnings(tracker, best_share_var.value, net, bitcoind_warning_var.value):
566                             print >>sys.stderr, '#'*40
567                             print >>sys.stderr, '>>> Warning: ' + warning
568                             print >>sys.stderr, '#'*40
569                     
570                     if this_str != last_str or time.time() > last_time + 15:
571                         print this_str
572                         last_str = this_str
573                         last_time = time.time()
574                 except:
575                     log.err()
576         status_thread()
577     except:
578         reactor.stop()
579         log.err(None, 'Fatal error:')
580
581 def run():
582     realnets = dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
583     
584     parser = fixargparse.FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
585     parser.add_argument('--version', action='version', version=p2pool.__version__)
586     parser.add_argument('--net',
587         help='use specified network (default: bitcoin)',
588         action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
589     parser.add_argument('--testnet',
590         help='''use the network's testnet''',
591         action='store_const', const=True, default=False, dest='testnet')
592     parser.add_argument('--debug',
593         help='enable debugging mode',
594         action='store_const', const=True, default=False, dest='debug')
595     parser.add_argument('-a', '--address',
596         help='generate payouts to this address (default: <address requested from bitcoind>)',
597         type=str, action='store', default=None, dest='address')
598     parser.add_argument('--datadir',
599         help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
600         type=str, action='store', default=None, dest='datadir')
601     parser.add_argument('--logfile',
602         help='''log to this file (default: data/<NET>/log)''',
603         type=str, action='store', default=None, dest='logfile')
604     parser.add_argument('--merged',
605         help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
606         type=str, action='append', default=[], dest='merged_urls')
607     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
608         help='donate this percentage of work towards the development of p2pool (default: 0.5)',
609         type=float, action='store', default=0.5, dest='donation_percentage')
610     parser.add_argument('--iocp',
611         help='use Windows IOCP API in order to avoid errors due to large number of sockets being open',
612         action='store_true', default=False, dest='iocp')
613     parser.add_argument('--irc-announce',
614         help='announce any blocks found on irc://irc.freenode.net/#p2pool',
615         action='store_true', default=False, dest='irc_announce')
616     parser.add_argument('--no-bugreport',
617         help='disable submitting caught exceptions to the author',
618         action='store_true', default=False, dest='no_bugreport')
619     
620     p2pool_group = parser.add_argument_group('p2pool interface')
621     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
622         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
623         type=int, action='store', default=None, dest='p2pool_port')
624     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
625         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
626         type=str, action='append', default=[], dest='p2pool_nodes')
627     parser.add_argument('--disable-upnp',
628         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
629         action='store_false', default=True, dest='upnp')
630     p2pool_group.add_argument('--max-conns', metavar='CONNS',
631         help='maximum incoming connections (default: 40)',
632         type=int, action='store', default=40, dest='p2pool_conns')
633     
634     worker_group = parser.add_argument_group('worker interface')
635     worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
636         help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
637         type=str, action='store', default=None, dest='worker_endpoint')
638     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
639         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
640         type=float, action='store', default=0, dest='worker_fee')
641     
642     bitcoind_group = parser.add_argument_group('bitcoind interface')
643     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
644         help='connect to this address (default: 127.0.0.1)',
645         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
646     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
647         help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
648         type=int, action='store', default=None, dest='bitcoind_rpc_port')
649     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
650         help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
651         type=int, action='store', default=None, dest='bitcoind_p2p_port')
652     
653     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
654         help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
655         type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
656     
657     args = parser.parse_args()
658     
659     if args.debug:
660         p2pool.DEBUG = True
661         defer.setDebugging(True)
662     
663     net_name = args.net_name + ('_testnet' if args.testnet else '')
664     net = networks.nets[net_name]
665     
666     datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
667     if not os.path.exists(datadir_path):
668         os.makedirs(datadir_path)
669     
670     if len(args.bitcoind_rpc_userpass) > 2:
671         parser.error('a maximum of two arguments are allowed')
672     args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
673     
674     if args.bitcoind_rpc_password is None:
675         conf_path = net.PARENT.CONF_FILE_FUNC()
676         if not os.path.exists(conf_path):
677             parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
678                 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
679                 '''\r\n'''
680                 '''server=1\r\n'''
681                 '''rpcpassword=%x\r\n'''
682                 '''\r\n'''
683                 '''Keep that password secret! After creating the file, restart Bitcoin.''' % (conf_path, random.randrange(2**128)))
684         with open(conf_path, 'rb') as f:
685             cp = ConfigParser.RawConfigParser()
686             cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
687             for conf_name, var_name, var_type in [
688                 ('rpcuser', 'bitcoind_rpc_username', str),
689                 ('rpcpassword', 'bitcoind_rpc_password', str),
690                 ('rpcport', 'bitcoind_rpc_port', int),
691                 ('port', 'bitcoind_p2p_port', int),
692             ]:
693                 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
694                     setattr(args, var_name, var_type(cp.get('x', conf_name)))
695         if args.bitcoind_rpc_password is None:
696             parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
697     
698     if args.bitcoind_rpc_username is None:
699         args.bitcoind_rpc_username = ''
700     
701     if args.bitcoind_rpc_port is None:
702         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
703     
704     if args.bitcoind_p2p_port is None:
705         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
706     
707     if args.p2pool_port is None:
708         args.p2pool_port = net.P2P_PORT
709     
710     if args.worker_endpoint is None:
711         worker_endpoint = '', net.WORKER_PORT
712     elif ':' not in args.worker_endpoint:
713         worker_endpoint = '', int(args.worker_endpoint)
714     else:
715         addr, port = args.worker_endpoint.rsplit(':', 1)
716         worker_endpoint = addr, int(port)
717     
718     if args.address is not None:
719         try:
720             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
721         except Exception, e:
722             parser.error('error parsing address: ' + repr(e))
723     else:
724         args.pubkey_hash = None
725     
726     def separate_url(url):
727         s = urlparse.urlsplit(url)
728         if '@' not in s.netloc:
729             parser.error('merged url netloc must contain an "@"')
730         userpass, new_netloc = s.netloc.rsplit('@', 1)
731         return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
732     merged_urls = map(separate_url, args.merged_urls)
733     
734     if args.logfile is None:
735         args.logfile = os.path.join(datadir_path, 'log')
736     
737     logfile = logging.LogFile(args.logfile)
738     pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
739     sys.stdout = logging.AbortPipe(pipe)
740     sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
741     if hasattr(signal, "SIGUSR1"):
742         def sigusr1(signum, frame):
743             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
744             logfile.reopen()
745             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
746         signal.signal(signal.SIGUSR1, sigusr1)
747     task.LoopingCall(logfile.reopen).start(5)
748     
749     class ErrorReporter(object):
750         def __init__(self):
751             self.last_sent = None
752         
753         def emit(self, eventDict):
754             if not eventDict["isError"]:
755                 return
756             
757             if self.last_sent is not None and time.time() < self.last_sent + 5:
758                 return
759             self.last_sent = time.time()
760             
761             if 'failure' in eventDict:
762                 text = ((eventDict.get('why') or 'Unhandled Error')
763                     + '\n' + eventDict['failure'].getTraceback())
764             else:
765                 text = " ".join([str(m) for m in eventDict["message"]]) + "\n"
766             
767             from twisted.web import client
768             client.getPage(
769                 url='http://u.forre.st/p2pool_error.cgi',
770                 method='POST',
771                 postdata=p2pool.__version__ + ' ' + net.NAME + '\n' + text,
772                 timeout=15,
773             ).addBoth(lambda x: None)
774     if not args.no_bugreport:
775         log.addObserver(ErrorReporter().emit)
776     
777     reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)
778     reactor.run()