cb69c16b7be4fe1a4a1b5846c79ff18104edc18c
[p2pool.git] / p2pool / main.py
1 from __future__ import division
2
3 import ConfigParser
4 import StringIO
5 import base64
6 import json
7 import os
8 import random
9 import sys
10 import time
11 import signal
12 import traceback
13 import urlparse
14
15 if '--iocp' in sys.argv:
16     from twisted.internet import iocpreactor
17     iocpreactor.install()
18 from twisted.internet import defer, reactor, protocol, task
19 from twisted.web import server
20 from twisted.python import log
21 from nattraverso import portmapper, ipdiscover
22
23 import bitcoin.p2p as bitcoin_p2p, bitcoin.data as bitcoin_data
24 from bitcoin import worker_interface, height_tracker
25 from util import fixargparse, jsonrpc, variable, deferral, math, logging
26 from . import p2p, networks, web, work
27 import p2pool, p2pool.data as p2pool_data
28
29 @deferral.retry('Error getting work from bitcoind:', 3)
30 @defer.inlineCallbacks
31 def getwork(bitcoind, use_getblocktemplate=False):
32     def go():
33         if use_getblocktemplate:
34             return bitcoind.rpc_getblocktemplate(dict(mode='template'))
35         else:
36             return bitcoind.rpc_getmemorypool()
37     try:
38         work = yield go()
39     except jsonrpc.Error_for_code(-32601): # Method not found
40         use_getblocktemplate = not use_getblocktemplate
41         try:
42             work = yield go()
43         except jsonrpc.Error_for_code(-32601): # Method not found
44             print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
45             raise deferral.RetrySilentlyException()
46     packed_transactions = [(x['data'] if isinstance(x, dict) else x).decode('hex') for x in work['transactions']]
47     if 'height' not in work:
48         work['height'] = (yield bitcoind.rpc_getblock(work['previousblockhash']))['height'] + 1
49     elif p2pool.DEBUG:
50         assert work['height'] == (yield bitcoind.rpc_getblock(work['previousblockhash']))['height'] + 1
51     defer.returnValue(dict(
52         version=work['version'],
53         previous_block=int(work['previousblockhash'], 16),
54         transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
55         merkle_link=bitcoin_data.calculate_merkle_link([None] + map(bitcoin_data.hash256, packed_transactions), 0),
56         subsidy=work['coinbasevalue'],
57         time=work['time'] if 'time' in work else work['curtime'],
58         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
59         coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
60         height=work['height'],
61         last_update=time.time(),
62         use_getblocktemplate=use_getblocktemplate,
63     ))
64
65 @defer.inlineCallbacks
66 def main(args, net, datadir_path, merged_urls, worker_endpoint):
67     try:
68         print 'p2pool (version %s)' % (p2pool.__version__,)
69         print
70         
71         # connect to bitcoind over JSON-RPC and do initial getmemorypool
72         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
73         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
74         bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
75         @deferral.retry('Error while checking Bitcoin connection:', 1)
76         @defer.inlineCallbacks
77         def check():
78             if not (yield net.PARENT.RPC_CHECK(bitcoind)):
79                 print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
80                 raise deferral.RetrySilentlyException()
81             temp_work = yield getwork(bitcoind)
82             if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version'], temp_work):
83                 print >>sys.stderr, '    Bitcoin version too old! BIP16 support required! Upgrade to 0.6.0rc4 or greater!'
84                 raise deferral.RetrySilentlyException()
85             defer.returnValue(temp_work)
86         temp_work = yield check()
87         
88         block_height_var = variable.Variable(None)
89         @defer.inlineCallbacks
90         def poll_height():
91             block_height_var.set((yield deferral.retry('Error while calling getblockcount:')(bitcoind.rpc_getblockcount)()))
92         yield poll_height()
93         task.LoopingCall(poll_height).start(60*60)
94         
95         bitcoind_warning_var = variable.Variable(None)
96         @defer.inlineCallbacks
97         def poll_warnings():
98             errors = (yield deferral.retry('Error while calling getmininginfo:')(bitcoind.rpc_getmininginfo)())['errors']
99             bitcoind_warning_var.set(errors if errors != '' else None)
100         yield poll_warnings()
101         task.LoopingCall(poll_warnings).start(20*60)
102         
103         print '    ...success!'
104         print '    Current block hash: %x' % (temp_work['previous_block'],)
105         print '    Current block height: %i' % (block_height_var.value,)
106         print
107         
108         # connect to bitcoind over bitcoin-p2p
109         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
110         factory = bitcoin_p2p.ClientFactory(net.PARENT)
111         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
112         yield factory.getProtocol() # waits until handshake is successful
113         print '    ...success!'
114         print
115         
116         print 'Determining payout address...'
117         if args.pubkey_hash is None:
118             address_path = os.path.join(datadir_path, 'cached_payout_address')
119             
120             if os.path.exists(address_path):
121                 with open(address_path, 'rb') as f:
122                     address = f.read().strip('\r\n')
123                 print '    Loaded cached address: %s...' % (address,)
124             else:
125                 address = None
126             
127             if address is not None:
128                 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
129                 if not res['isvalid'] or not res['ismine']:
130                     print '    Cached address is either invalid or not controlled by local bitcoind!'
131                     address = None
132             
133             if address is None:
134                 print '    Getting payout address from bitcoind...'
135                 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
136             
137             with open(address_path, 'wb') as f:
138                 f.write(address)
139             
140             my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
141         else:
142             my_pubkey_hash = args.pubkey_hash
143         print '    ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
144         print
145         
146         my_share_hashes = set()
147         my_doa_share_hashes = set()
148         
149         tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
150         shared_share_hashes = set()
151         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
152         known_verified = set()
153         print "Loading shares..."
154         for i, (mode, contents) in enumerate(ss.get_shares()):
155             if mode == 'share':
156                 if contents.hash in tracker.items:
157                     continue
158                 shared_share_hashes.add(contents.hash)
159                 contents.time_seen = 0
160                 tracker.add(contents)
161                 if len(tracker.items) % 1000 == 0 and tracker.items:
162                     print "    %i" % (len(tracker.items),)
163             elif mode == 'verified_hash':
164                 known_verified.add(contents)
165             else:
166                 raise AssertionError()
167         print "    ...inserting %i verified shares..." % (len(known_verified),)
168         for h in known_verified:
169             if h not in tracker.items:
170                 ss.forget_verified_share(h)
171                 continue
172             tracker.verified.add(tracker.items[h])
173         print "    ...done loading %i shares!" % (len(tracker.items),)
174         print
175         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
176         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
177         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
178         
179         print 'Initializing work...'
180         
181         
182         # BITCOIND WORK
183         
184         bitcoind_work = variable.Variable((yield getwork(bitcoind)))
185         @defer.inlineCallbacks
186         def work_poller():
187             while True:
188                 flag = factory.new_block.get_deferred()
189                 try:
190                     bitcoind_work.set((yield getwork(bitcoind, bitcoind_work.value['use_getblocktemplate'])))
191                 except:
192                     log.err()
193                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
194         work_poller()
195         
196         # PEER WORK
197         
198         best_block_header = variable.Variable(None)
199         def handle_header(new_header):
200             # check that header matches current target
201             if not (net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(new_header)) <= bitcoind_work.value['bits'].target):
202                 return
203             bitcoind_best_block = bitcoind_work.value['previous_block']
204             if (best_block_header.value is None
205                 or (
206                     new_header['previous_block'] == bitcoind_best_block and
207                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)) == bitcoind_best_block
208                 ) # new is child of current and previous is current
209                 or (
210                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(new_header)) == bitcoind_best_block and
211                     best_block_header.value['previous_block'] != bitcoind_best_block
212                 )): # new is current and previous is not a child of current
213                 best_block_header.set(new_header)
214         @defer.inlineCallbacks
215         def poll_header():
216             handle_header((yield factory.conn.value.get_block_header(bitcoind_work.value['previous_block'])))
217         bitcoind_work.changed.watch(lambda _: poll_header())
218         yield deferral.retry('Error while requesting best block header:')(poll_header)()
219         
220         # BEST SHARE
221         
222         get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: bitcoind_work.value['previous_block'], net)
223         
224         best_share_var = variable.Variable(None)
225         desired_var = variable.Variable(None)
226         def set_best_share():
227             best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'])
228             
229             best_share_var.set(best)
230             desired_var.set(desired)
231         bitcoind_work.changed.watch(lambda _: set_best_share())
232         set_best_share()
233         
234         print '    ...success!'
235         print
236         
237         # setup p2p logic and join p2pool network
238         
239         class Node(p2p.Node):
240             def handle_shares(self, shares, peer):
241                 if len(shares) > 5:
242                     print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
243                 
244                 new_count = 0
245                 for share in shares:
246                     if share.hash in tracker.items:
247                         #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
248                         continue
249                     
250                     new_count += 1
251                     
252                     #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
253                     
254                     tracker.add(share)
255                 
256                 if new_count:
257                     set_best_share()
258                 
259                 if len(shares) > 5:
260                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.items), 2*net.CHAIN_LENGTH)
261             
262             @defer.inlineCallbacks
263             def handle_share_hashes(self, hashes, peer):
264                 new_hashes = [x for x in hashes if x not in tracker.items]
265                 if not new_hashes:
266                     return
267                 try:
268                     shares = yield peer.get_shares(
269                         hashes=new_hashes,
270                         parents=0,
271                         stops=[],
272                     )
273                 except:
274                     log.err(None, 'in handle_share_hashes:')
275                 else:
276                     self.handle_shares(shares, peer)
277             
278             def handle_get_shares(self, hashes, parents, stops, peer):
279                 parents = min(parents, 1000//len(hashes))
280                 stops = set(stops)
281                 shares = []
282                 for share_hash in hashes:
283                     for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
284                         if share.hash in stops:
285                             break
286                         shares.append(share)
287                 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
288                 return shares
289             
290             def handle_bestblock(self, header, peer):
291                 if net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header)) > header['bits'].target:
292                     raise p2p.PeerMisbehavingError('received block header fails PoW test')
293                 handle_header(header)
294         
295         @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
296         def submit_block_p2p(block):
297             if factory.conn.value is None:
298                 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, bitcoin_data.hash256(bitcoin_data.block_header_type.pack(block['header'])))
299                 raise deferral.RetrySilentlyException()
300             factory.conn.value.send_block(block=block)
301         
302         @deferral.retry('Error submitting block: (will retry)', 10, 10)
303         @defer.inlineCallbacks
304         def submit_block_rpc(block, ignore_failure):
305             if bitcoind_work.value['use_getblocktemplate']:
306                 result = yield bitcoind.rpc_submitblock(bitcoin_data.block_type.pack(block).encode('hex'))
307                 success = result is None
308             else:
309                 result = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
310                 success = result
311             success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
312             if (not success and success_expected and not ignore_failure) or (success and not success_expected):
313                 print >>sys.stderr, 'Block submittal result: %s (%r) Expected: %s' % (success, result, success_expected)
314         
315         def submit_block(block, ignore_failure):
316             submit_block_p2p(block)
317             submit_block_rpc(block, ignore_failure)
318         
319         @tracker.verified.added.watch
320         def _(share):
321             if share.pow_hash <= share.header['bits'].target:
322                 submit_block(share.as_block(tracker), ignore_failure=True)
323                 print
324                 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
325                 print
326                 def spread():
327                     if (get_height_rel_highest(share.header['previous_block']) > -5 or
328                         bitcoind_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]):
329                         broadcast_share(share.hash)
330                 spread()
331                 reactor.callLater(5, spread) # so get_height_rel_highest can update
332         
333         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
334         
335         @defer.inlineCallbacks
336         def parse(x):
337             if ':' in x:
338                 ip, port = x.split(':')
339                 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
340             else:
341                 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
342         
343         addrs = {}
344         if os.path.exists(os.path.join(datadir_path, 'addrs')):
345             try:
346                 with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
347                     addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
348             except:
349                 print >>sys.stderr, 'error parsing addrs'
350         for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
351             try:
352                 addr = yield addr_df
353                 if addr not in addrs:
354                     addrs[addr] = (0, time.time(), time.time())
355             except:
356                 log.err()
357         
358         connect_addrs = set()
359         for addr_df in map(parse, args.p2pool_nodes):
360             try:
361                 connect_addrs.add((yield addr_df))
362             except:
363                 log.err()
364         
365         p2p_node = Node(
366             best_share_hash_func=lambda: best_share_var.value,
367             port=args.p2pool_port,
368             net=net,
369             addr_store=addrs,
370             connect_addrs=connect_addrs,
371             max_incoming_conns=args.p2pool_conns,
372         )
373         p2p_node.start()
374         
375         def save_addrs():
376             with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
377                 f.write(json.dumps(p2p_node.addr_store.items()))
378         task.LoopingCall(save_addrs).start(60)
379         
380         @best_block_header.changed.watch
381         def _(header):
382             for peer in p2p_node.peers.itervalues():
383                 peer.send_bestblock(header=header)
384         
385         @defer.inlineCallbacks
386         def broadcast_share(share_hash):
387             shares = []
388             for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
389                 if share.hash in shared_share_hashes:
390                     break
391                 shared_share_hashes.add(share.hash)
392                 shares.append(share)
393             
394             for peer in list(p2p_node.peers.itervalues()):
395                 yield peer.sendShares([share for share in shares if share.peer is not peer])
396         
397         # send share when the chain changes to their chain
398         best_share_var.changed.watch(broadcast_share)
399         
400         def save_shares():
401             for share in tracker.get_chain(best_share_var.value, min(tracker.get_height(best_share_var.value), 2*net.CHAIN_LENGTH)):
402                 ss.add_share(share)
403                 if share.hash in tracker.verified.items:
404                     ss.add_verified_hash(share.hash)
405         task.LoopingCall(save_shares).start(60)
406         
407         @apply
408         @defer.inlineCallbacks
409         def download_shares():
410             while True:
411                 desired = yield desired_var.get_when_satisfies(lambda val: len(val) != 0)
412                 peer2, share_hash = random.choice(desired)
413                 
414                 if len(p2p_node.peers) == 0:
415                     yield deferral.sleep(1)
416                     continue
417                 peer = random.choice(p2p_node.peers.values())
418                 
419                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
420                 try:
421                     shares = yield peer.get_shares(
422                         hashes=[share_hash],
423                         parents=500,
424                         stops=[],
425                     )
426                 except:
427                     log.err(None, 'in download_shares:')
428                 else:
429                     p2p_node.handle_shares(shares, peer)
430         
431         print '    ...success!'
432         print
433         
434         if args.upnp:
435             @defer.inlineCallbacks
436             def upnp_thread():
437                 while True:
438                     try:
439                         is_lan, lan_ip = yield ipdiscover.get_local_ip()
440                         if is_lan:
441                             pm = yield portmapper.get_port_mapper()
442                             yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
443                     except defer.TimeoutError:
444                         pass
445                     except:
446                         if p2pool.DEBUG:
447                             log.err(None, 'UPnP error:')
448                     yield deferral.sleep(random.expovariate(1/120))
449             upnp_thread()
450         
451         # start listening for workers with a JSON-RPC server
452         
453         print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
454         
455         get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, best_share_var.value, bitcoind_work.value['bits'].target, bitcoind_work.value['subsidy'], net)
456         
457         wb = work.WorkerBridge(my_pubkey_hash, net, args.donation_percentage, bitcoind_work, best_block_header, merged_urls, best_share_var, tracker, my_share_hashes, my_doa_share_hashes, args.worker_fee, p2p_node, submit_block, set_best_share, broadcast_share, block_height_var)
458         web_root = web.get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net, wb.get_stale_counts, my_pubkey_hash, wb.local_rate_monitor, args.worker_fee, p2p_node, wb.my_share_hashes, wb.pseudoshare_received, wb.share_received, best_share_var, bitcoind_warning_var)
459         worker_interface.WorkerInterface(wb).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
460         
461         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
462         
463         with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
464             pass
465         
466         print '    ...success!'
467         print
468         
469         
470         # done!
471         print 'Started successfully!'
472         print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
473         if args.donation_percentage > 0.51:
474             print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
475         elif args.donation_percentage < 0.49:
476             print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
477         else:
478             print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
479             print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
480         print
481         
482         
483         if hasattr(signal, 'SIGALRM'):
484             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
485                 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
486             ))
487             signal.siginterrupt(signal.SIGALRM, False)
488             task.LoopingCall(signal.alarm, 30).start(1)
489         
490         if args.irc_announce:
491             from twisted.words.protocols import irc
492             class IRCClient(irc.IRCClient):
493                 nickname = 'p2pool%02i' % (random.randrange(100),)
494                 channel = net.ANNOUNCE_CHANNEL
495                 def lineReceived(self, line):
496                     if p2pool.DEBUG:
497                         print repr(line)
498                     irc.IRCClient.lineReceived(self, line)
499                 def signedOn(self):
500                     irc.IRCClient.signedOn(self)
501                     self.factory.resetDelay()
502                     self.join(self.channel)
503                     @defer.inlineCallbacks
504                     def new_share(share):
505                         if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
506                             yield deferral.sleep(random.expovariate(1/60))
507                             message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
508                             if all('%x' % (share.header_hash,) not in old_message for old_message in self.recent_messages):
509                                 self.say(self.channel, message)
510                                 self._remember_message(message)
511                     self.watch_id = tracker.verified.added.watch(new_share)
512                     self.recent_messages = []
513                 def _remember_message(self, message):
514                     self.recent_messages.append(message)
515                     while len(self.recent_messages) > 100:
516                         self.recent_messages.pop(0)
517                 def privmsg(self, user, channel, message):
518                     if channel == self.channel:
519                         self._remember_message(message)
520                 def connectionLost(self, reason):
521                     tracker.verified.added.unwatch(self.watch_id)
522                     print 'IRC connection lost:', reason.getErrorMessage()
523             class IRCClientFactory(protocol.ReconnectingClientFactory):
524                 protocol = IRCClient
525             reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
526         
527         @defer.inlineCallbacks
528         def status_thread():
529             last_str = None
530             last_time = 0
531             while True:
532                 yield deferral.sleep(3)
533                 try:
534                     height = tracker.get_height(best_share_var.value)
535                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
536                         height,
537                         len(tracker.verified.items),
538                         len(tracker.items),
539                         len(p2p_node.peers),
540                         sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
541                     ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
542                     
543                     datums, dt = wb.local_rate_monitor.get_datums_in_last()
544                     my_att_s = sum(datum['work']/dt for datum in datums)
545                     this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
546                         math.format(int(my_att_s)),
547                         math.format_dt(dt),
548                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
549                         math.format_dt(2**256 / tracker.items[best_share_var.value].max_target / my_att_s) if my_att_s and best_share_var.value else '???',
550                     )
551                     
552                     if height > 2:
553                         (stale_orphan_shares, stale_doa_shares), shares, _ = wb.get_stale_counts()
554                         stale_prop = p2pool_data.get_average_stale_prop(tracker, best_share_var.value, min(60*60//net.SHARE_PERIOD, height))
555                         real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, best_share_var.value, min(height - 1, 60*60//net.SHARE_PERIOD)) / (1 - stale_prop)
556                         
557                         this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
558                             shares, stale_orphan_shares, stale_doa_shares,
559                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
560                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
561                             get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
562                         )
563                         this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
564                             math.format(int(real_att_s)),
565                             100*stale_prop,
566                             math.format_dt(2**256 / bitcoind_work.value['bits'].target / real_att_s),
567                         )
568                         
569                         for warning in p2pool_data.get_warnings(tracker, best_share_var.value, net, bitcoind_warning_var.value, bitcoind_work.value):
570                             print >>sys.stderr, '#'*40
571                             print >>sys.stderr, '>>> Warning: ' + warning
572                             print >>sys.stderr, '#'*40
573                     
574                     if this_str != last_str or time.time() > last_time + 15:
575                         print this_str
576                         last_str = this_str
577                         last_time = time.time()
578                 except:
579                     log.err()
580         status_thread()
581     except:
582         reactor.stop()
583         log.err(None, 'Fatal error:')
584
585 def run():
586     realnets = dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
587     
588     parser = fixargparse.FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
589     parser.add_argument('--version', action='version', version=p2pool.__version__)
590     parser.add_argument('--net',
591         help='use specified network (default: bitcoin)',
592         action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
593     parser.add_argument('--testnet',
594         help='''use the network's testnet''',
595         action='store_const', const=True, default=False, dest='testnet')
596     parser.add_argument('--debug',
597         help='enable debugging mode',
598         action='store_const', const=True, default=False, dest='debug')
599     parser.add_argument('-a', '--address',
600         help='generate payouts to this address (default: <address requested from bitcoind>)',
601         type=str, action='store', default=None, dest='address')
602     parser.add_argument('--datadir',
603         help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
604         type=str, action='store', default=None, dest='datadir')
605     parser.add_argument('--logfile',
606         help='''log to this file (default: data/<NET>/log)''',
607         type=str, action='store', default=None, dest='logfile')
608     parser.add_argument('--merged',
609         help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
610         type=str, action='append', default=[], dest='merged_urls')
611     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
612         help='donate this percentage of work towards the development of p2pool (default: 0.5)',
613         type=float, action='store', default=0.5, dest='donation_percentage')
614     parser.add_argument('--iocp',
615         help='use Windows IOCP API in order to avoid errors due to large number of sockets being open',
616         action='store_true', default=False, dest='iocp')
617     parser.add_argument('--irc-announce',
618         help='announce any blocks found on irc://irc.freenode.net/#p2pool',
619         action='store_true', default=False, dest='irc_announce')
620     parser.add_argument('--no-bugreport',
621         help='disable submitting caught exceptions to the author',
622         action='store_true', default=False, dest='no_bugreport')
623     
624     p2pool_group = parser.add_argument_group('p2pool interface')
625     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
626         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
627         type=int, action='store', default=None, dest='p2pool_port')
628     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
629         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
630         type=str, action='append', default=[], dest='p2pool_nodes')
631     parser.add_argument('--disable-upnp',
632         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
633         action='store_false', default=True, dest='upnp')
634     p2pool_group.add_argument('--max-conns', metavar='CONNS',
635         help='maximum incoming connections (default: 40)',
636         type=int, action='store', default=40, dest='p2pool_conns')
637     
638     worker_group = parser.add_argument_group('worker interface')
639     worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
640         help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
641         type=str, action='store', default=None, dest='worker_endpoint')
642     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
643         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
644         type=float, action='store', default=0, dest='worker_fee')
645     
646     bitcoind_group = parser.add_argument_group('bitcoind interface')
647     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
648         help='connect to this address (default: 127.0.0.1)',
649         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
650     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
651         help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
652         type=int, action='store', default=None, dest='bitcoind_rpc_port')
653     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
654         help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
655         type=int, action='store', default=None, dest='bitcoind_p2p_port')
656     
657     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
658         help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
659         type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
660     
661     args = parser.parse_args()
662     
663     if args.debug:
664         p2pool.DEBUG = True
665         defer.setDebugging(True)
666     
667     net_name = args.net_name + ('_testnet' if args.testnet else '')
668     net = networks.nets[net_name]
669     
670     datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
671     if not os.path.exists(datadir_path):
672         os.makedirs(datadir_path)
673     
674     if len(args.bitcoind_rpc_userpass) > 2:
675         parser.error('a maximum of two arguments are allowed')
676     args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
677     
678     if args.bitcoind_rpc_password is None:
679         conf_path = net.PARENT.CONF_FILE_FUNC()
680         if not os.path.exists(conf_path):
681             parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
682                 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
683                 '''\r\n'''
684                 '''server=1\r\n'''
685                 '''rpcpassword=%x\r\n'''
686                 '''\r\n'''
687                 '''Keep that password secret! After creating the file, restart Bitcoin.''' % (conf_path, random.randrange(2**128)))
688         with open(conf_path, 'rb') as f:
689             cp = ConfigParser.RawConfigParser()
690             cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
691             for conf_name, var_name, var_type in [
692                 ('rpcuser', 'bitcoind_rpc_username', str),
693                 ('rpcpassword', 'bitcoind_rpc_password', str),
694                 ('rpcport', 'bitcoind_rpc_port', int),
695                 ('port', 'bitcoind_p2p_port', int),
696             ]:
697                 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
698                     setattr(args, var_name, var_type(cp.get('x', conf_name)))
699         if args.bitcoind_rpc_password is None:
700             parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
701     
702     if args.bitcoind_rpc_username is None:
703         args.bitcoind_rpc_username = ''
704     
705     if args.bitcoind_rpc_port is None:
706         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
707     
708     if args.bitcoind_p2p_port is None:
709         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
710     
711     if args.p2pool_port is None:
712         args.p2pool_port = net.P2P_PORT
713     
714     if args.worker_endpoint is None:
715         worker_endpoint = '', net.WORKER_PORT
716     elif ':' not in args.worker_endpoint:
717         worker_endpoint = '', int(args.worker_endpoint)
718     else:
719         addr, port = args.worker_endpoint.rsplit(':', 1)
720         worker_endpoint = addr, int(port)
721     
722     if args.address is not None:
723         try:
724             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
725         except Exception, e:
726             parser.error('error parsing address: ' + repr(e))
727     else:
728         args.pubkey_hash = None
729     
730     def separate_url(url):
731         s = urlparse.urlsplit(url)
732         if '@' not in s.netloc:
733             parser.error('merged url netloc must contain an "@"')
734         userpass, new_netloc = s.netloc.rsplit('@', 1)
735         return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
736     merged_urls = map(separate_url, args.merged_urls)
737     
738     if args.logfile is None:
739         args.logfile = os.path.join(datadir_path, 'log')
740     
741     logfile = logging.LogFile(args.logfile)
742     pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
743     sys.stdout = logging.AbortPipe(pipe)
744     sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
745     if hasattr(signal, "SIGUSR1"):
746         def sigusr1(signum, frame):
747             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
748             logfile.reopen()
749             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
750         signal.signal(signal.SIGUSR1, sigusr1)
751     task.LoopingCall(logfile.reopen).start(5)
752     
753     class ErrorReporter(object):
754         def __init__(self):
755             self.last_sent = None
756         
757         def emit(self, eventDict):
758             if not eventDict["isError"]:
759                 return
760             
761             if self.last_sent is not None and time.time() < self.last_sent + 5:
762                 return
763             self.last_sent = time.time()
764             
765             if 'failure' in eventDict:
766                 text = ((eventDict.get('why') or 'Unhandled Error')
767                     + '\n' + eventDict['failure'].getTraceback())
768             else:
769                 text = " ".join([str(m) for m in eventDict["message"]]) + "\n"
770             
771             from twisted.web import client
772             client.getPage(
773                 url='http://u.forre.st/p2pool_error.cgi',
774                 method='POST',
775                 postdata=p2pool.__version__ + ' ' + net.NAME + '\n' + text,
776                 timeout=15,
777             ).addBoth(lambda x: None)
778     if not args.no_bugreport:
779         log.addObserver(ErrorReporter().emit)
780     
781     reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)
782     reactor.run()