added network traffic graph
[p2pool.git] / p2pool / main.py
1 from __future__ import division
2
3 import ConfigParser
4 import StringIO
5 import base64
6 import json
7 import os
8 import random
9 import sys
10 import time
11 import signal
12 import traceback
13 import urlparse
14
15 if '--iocp' in sys.argv:
16     from twisted.internet import iocpreactor
17     iocpreactor.install()
18 from twisted.internet import defer, reactor, protocol, task
19 from twisted.web import server
20 from twisted.python import log
21 from nattraverso import portmapper, ipdiscover
22
23 import bitcoin.p2p as bitcoin_p2p, bitcoin.data as bitcoin_data
24 from bitcoin import worker_interface, height_tracker
25 from util import fixargparse, jsonrpc, variable, deferral, math, logging
26 from . import p2p, networks, web, work
27 import p2pool, p2pool.data as p2pool_data
28
29 @deferral.retry('Error getting work from bitcoind:', 3)
30 @defer.inlineCallbacks
31 def getwork(bitcoind, use_getblocktemplate=False):
32     def go():
33         if use_getblocktemplate:
34             return bitcoind.rpc_getblocktemplate(dict(mode='template'))
35         else:
36             return bitcoind.rpc_getmemorypool()
37     try:
38         work = yield go()
39     except jsonrpc.Error_for_code(-32601): # Method not found
40         use_getblocktemplate = not use_getblocktemplate
41         try:
42             work = yield go()
43         except jsonrpc.Error_for_code(-32601): # Method not found
44             print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
45             raise deferral.RetrySilentlyException()
46     packed_transactions = [(x['data'] if isinstance(x, dict) else x).decode('hex') for x in work['transactions']]
47     if 'height' not in work:
48         work['height'] = (yield bitcoind.rpc_getblock(work['previousblockhash']))['height'] + 1
49     elif p2pool.DEBUG:
50         assert work['height'] == (yield bitcoind.rpc_getblock(work['previousblockhash']))['height'] + 1
51     defer.returnValue(dict(
52         version=work['version'],
53         previous_block=int(work['previousblockhash'], 16),
54         transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
55         merkle_link=bitcoin_data.calculate_merkle_link([None] + map(bitcoin_data.hash256, packed_transactions), 0),
56         subsidy=work['coinbasevalue'],
57         time=work['time'] if 'time' in work else work['curtime'],
58         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
59         coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
60         height=work['height'],
61         last_update=time.time(),
62         use_getblocktemplate=use_getblocktemplate,
63     ))
64
65 @defer.inlineCallbacks
66 def main(args, net, datadir_path, merged_urls, worker_endpoint):
67     try:
68         print 'p2pool (version %s)' % (p2pool.__version__,)
69         print
70         
71         traffic_happened = variable.Event()
72         
73         # connect to bitcoind over JSON-RPC and do initial getmemorypool
74         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
75         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
76         bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
77         @deferral.retry('Error while checking Bitcoin connection:', 1)
78         @defer.inlineCallbacks
79         def check():
80             if not (yield net.PARENT.RPC_CHECK(bitcoind)):
81                 print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
82                 raise deferral.RetrySilentlyException()
83             if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version']):
84                 print >>sys.stderr, '    Bitcoin version too old! Upgrade to 0.6.4 or newer!'
85                 raise deferral.RetrySilentlyException()
86         yield check()
87         temp_work = yield getwork(bitcoind)
88         
89         block_height_var = variable.Variable(None)
90         @defer.inlineCallbacks
91         def poll_height():
92             block_height_var.set((yield deferral.retry('Error while calling getblockcount:')(bitcoind.rpc_getblockcount)()))
93         yield poll_height()
94         task.LoopingCall(poll_height).start(60*60)
95         
96         bitcoind_warning_var = variable.Variable(None)
97         @defer.inlineCallbacks
98         def poll_warnings():
99             errors = (yield deferral.retry('Error while calling getmininginfo:')(bitcoind.rpc_getmininginfo)())['errors']
100             bitcoind_warning_var.set(errors if errors != '' else None)
101         yield poll_warnings()
102         task.LoopingCall(poll_warnings).start(20*60)
103         
104         print '    ...success!'
105         print '    Current block hash: %x' % (temp_work['previous_block'],)
106         print '    Current block height: %i' % (block_height_var.value,)
107         print
108         
109         # connect to bitcoind over bitcoin-p2p
110         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
111         factory = bitcoin_p2p.ClientFactory(net.PARENT)
112         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
113         yield factory.getProtocol() # waits until handshake is successful
114         print '    ...success!'
115         print
116         
117         print 'Determining payout address...'
118         if args.pubkey_hash is None:
119             address_path = os.path.join(datadir_path, 'cached_payout_address')
120             
121             if os.path.exists(address_path):
122                 with open(address_path, 'rb') as f:
123                     address = f.read().strip('\r\n')
124                 print '    Loaded cached address: %s...' % (address,)
125             else:
126                 address = None
127             
128             if address is not None:
129                 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
130                 if not res['isvalid'] or not res['ismine']:
131                     print '    Cached address is either invalid or not controlled by local bitcoind!'
132                     address = None
133             
134             if address is None:
135                 print '    Getting payout address from bitcoind...'
136                 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
137             
138             with open(address_path, 'wb') as f:
139                 f.write(address)
140             
141             my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
142         else:
143             my_pubkey_hash = args.pubkey_hash
144         print '    ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
145         print
146         
147         my_share_hashes = set()
148         my_doa_share_hashes = set()
149         
150         tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
151         shared_share_hashes = set()
152         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
153         known_verified = set()
154         print "Loading shares..."
155         for i, (mode, contents) in enumerate(ss.get_shares()):
156             if mode == 'share':
157                 if contents.hash in tracker.items:
158                     continue
159                 shared_share_hashes.add(contents.hash)
160                 contents.time_seen = 0
161                 tracker.add(contents)
162                 if len(tracker.items) % 1000 == 0 and tracker.items:
163                     print "    %i" % (len(tracker.items),)
164             elif mode == 'verified_hash':
165                 known_verified.add(contents)
166             else:
167                 raise AssertionError()
168         print "    ...inserting %i verified shares..." % (len(known_verified),)
169         for h in known_verified:
170             if h not in tracker.items:
171                 ss.forget_verified_share(h)
172                 continue
173             tracker.verified.add(tracker.items[h])
174         print "    ...done loading %i shares!" % (len(tracker.items),)
175         print
176         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
177         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
178         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
179         
180         print 'Initializing work...'
181         
182         
183         # BITCOIND WORK
184         
185         bitcoind_work = variable.Variable((yield getwork(bitcoind)))
186         @defer.inlineCallbacks
187         def work_poller():
188             while True:
189                 flag = factory.new_block.get_deferred()
190                 try:
191                     bitcoind_work.set((yield getwork(bitcoind, bitcoind_work.value['use_getblocktemplate'])))
192                 except:
193                     log.err()
194                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
195         work_poller()
196         
197         # PEER WORK
198         
199         best_block_header = variable.Variable(None)
200         def handle_header(new_header):
201             # check that header matches current target
202             if not (net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(new_header)) <= bitcoind_work.value['bits'].target):
203                 return
204             bitcoind_best_block = bitcoind_work.value['previous_block']
205             if (best_block_header.value is None
206                 or (
207                     new_header['previous_block'] == bitcoind_best_block and
208                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)) == bitcoind_best_block
209                 ) # new is child of current and previous is current
210                 or (
211                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(new_header)) == bitcoind_best_block and
212                     best_block_header.value['previous_block'] != bitcoind_best_block
213                 )): # new is current and previous is not a child of current
214                 best_block_header.set(new_header)
215         @defer.inlineCallbacks
216         def poll_header():
217             handle_header((yield factory.conn.value.get_block_header(bitcoind_work.value['previous_block'])))
218         bitcoind_work.changed.watch(lambda _: poll_header())
219         yield deferral.retry('Error while requesting best block header:')(poll_header)()
220         
221         # BEST SHARE
222         
223         get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: bitcoind_work.value['previous_block'], net)
224         
225         best_share_var = variable.Variable(None)
226         desired_var = variable.Variable(None)
227         def set_best_share():
228             best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'])
229             
230             best_share_var.set(best)
231             desired_var.set(desired)
232         bitcoind_work.changed.watch(lambda _: set_best_share())
233         set_best_share()
234         
235         print '    ...success!'
236         print
237         
238         # setup p2p logic and join p2pool network
239         
240         class Node(p2p.Node):
241             def handle_shares(self, shares, peer):
242                 if len(shares) > 5:
243                     print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
244                 
245                 new_count = 0
246                 for share in shares:
247                     if share.hash in tracker.items:
248                         #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
249                         continue
250                     
251                     new_count += 1
252                     
253                     #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
254                     
255                     tracker.add(share)
256                 
257                 if new_count:
258                     set_best_share()
259                 
260                 if len(shares) > 5:
261                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.items), 2*net.CHAIN_LENGTH)
262             
263             @defer.inlineCallbacks
264             def handle_share_hashes(self, hashes, peer):
265                 new_hashes = [x for x in hashes if x not in tracker.items]
266                 if not new_hashes:
267                     return
268                 try:
269                     shares = yield peer.get_shares(
270                         hashes=new_hashes,
271                         parents=0,
272                         stops=[],
273                     )
274                 except:
275                     log.err(None, 'in handle_share_hashes:')
276                 else:
277                     self.handle_shares(shares, peer)
278             
279             def handle_get_shares(self, hashes, parents, stops, peer):
280                 parents = min(parents, 1000//len(hashes))
281                 stops = set(stops)
282                 shares = []
283                 for share_hash in hashes:
284                     for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
285                         if share.hash in stops:
286                             break
287                         shares.append(share)
288                 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
289                 return shares
290             
291             def handle_bestblock(self, header, peer):
292                 if net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header)) > header['bits'].target:
293                     raise p2p.PeerMisbehavingError('received block header fails PoW test')
294                 handle_header(header)
295         
296         @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
297         def submit_block_p2p(block):
298             if factory.conn.value is None:
299                 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, bitcoin_data.hash256(bitcoin_data.block_header_type.pack(block['header'])))
300                 raise deferral.RetrySilentlyException()
301             factory.conn.value.send_block(block=block)
302         
303         @deferral.retry('Error submitting block: (will retry)', 10, 10)
304         @defer.inlineCallbacks
305         def submit_block_rpc(block, ignore_failure):
306             if bitcoind_work.value['use_getblocktemplate']:
307                 result = yield bitcoind.rpc_submitblock(bitcoin_data.block_type.pack(block).encode('hex'))
308                 success = result is None
309             else:
310                 result = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
311                 success = result
312             success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
313             if (not success and success_expected and not ignore_failure) or (success and not success_expected):
314                 print >>sys.stderr, 'Block submittal result: %s (%r) Expected: %s' % (success, result, success_expected)
315         
316         def submit_block(block, ignore_failure):
317             submit_block_p2p(block)
318             submit_block_rpc(block, ignore_failure)
319         
320         @tracker.verified.added.watch
321         def _(share):
322             if share.pow_hash <= share.header['bits'].target:
323                 submit_block(share.as_block(tracker), ignore_failure=True)
324                 print
325                 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
326                 print
327                 def spread():
328                     if (get_height_rel_highest(share.header['previous_block']) > -5 or
329                         bitcoind_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]):
330                         broadcast_share(share.hash)
331                 spread()
332                 reactor.callLater(5, spread) # so get_height_rel_highest can update
333         
334         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
335         
336         @defer.inlineCallbacks
337         def parse(x):
338             if ':' in x:
339                 ip, port = x.split(':')
340                 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
341             else:
342                 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
343         
344         addrs = {}
345         if os.path.exists(os.path.join(datadir_path, 'addrs')):
346             try:
347                 with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
348                     addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
349             except:
350                 print >>sys.stderr, 'error parsing addrs'
351         for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
352             try:
353                 addr = yield addr_df
354                 if addr not in addrs:
355                     addrs[addr] = (0, time.time(), time.time())
356             except:
357                 log.err()
358         
359         connect_addrs = set()
360         for addr_df in map(parse, args.p2pool_nodes):
361             try:
362                 connect_addrs.add((yield addr_df))
363             except:
364                 log.err()
365         
366         p2p_node = Node(
367             best_share_hash_func=lambda: best_share_var.value,
368             port=args.p2pool_port,
369             net=net,
370             addr_store=addrs,
371             connect_addrs=connect_addrs,
372             max_incoming_conns=args.p2pool_conns,
373             traffic_happened=traffic_happened,
374         )
375         p2p_node.start()
376         
377         def save_addrs():
378             with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
379                 f.write(json.dumps(p2p_node.addr_store.items()))
380         task.LoopingCall(save_addrs).start(60)
381         
382         @best_block_header.changed.watch
383         def _(header):
384             for peer in p2p_node.peers.itervalues():
385                 peer.send_bestblock(header=header)
386         
387         @defer.inlineCallbacks
388         def broadcast_share(share_hash):
389             shares = []
390             for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
391                 if share.hash in shared_share_hashes:
392                     break
393                 shared_share_hashes.add(share.hash)
394                 shares.append(share)
395             
396             for peer in list(p2p_node.peers.itervalues()):
397                 yield peer.sendShares([share for share in shares if share.peer is not peer])
398         
399         # send share when the chain changes to their chain
400         best_share_var.changed.watch(broadcast_share)
401         
402         def save_shares():
403             for share in tracker.get_chain(best_share_var.value, min(tracker.get_height(best_share_var.value), 2*net.CHAIN_LENGTH)):
404                 ss.add_share(share)
405                 if share.hash in tracker.verified.items:
406                     ss.add_verified_hash(share.hash)
407         task.LoopingCall(save_shares).start(60)
408         
409         @apply
410         @defer.inlineCallbacks
411         def download_shares():
412             while True:
413                 desired = yield desired_var.get_when_satisfies(lambda val: len(val) != 0)
414                 peer2, share_hash = random.choice(desired)
415                 
416                 if len(p2p_node.peers) == 0:
417                     yield deferral.sleep(1)
418                     continue
419                 peer = random.choice(p2p_node.peers.values())
420                 
421                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
422                 try:
423                     shares = yield peer.get_shares(
424                         hashes=[share_hash],
425                         parents=500,
426                         stops=[],
427                     )
428                 except:
429                     log.err(None, 'in download_shares:')
430                 else:
431                     p2p_node.handle_shares(shares, peer)
432         
433         print '    ...success!'
434         print
435         
436         if args.upnp:
437             @defer.inlineCallbacks
438             def upnp_thread():
439                 while True:
440                     try:
441                         is_lan, lan_ip = yield ipdiscover.get_local_ip()
442                         if is_lan:
443                             pm = yield portmapper.get_port_mapper()
444                             yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
445                     except defer.TimeoutError:
446                         pass
447                     except:
448                         if p2pool.DEBUG:
449                             log.err(None, 'UPnP error:')
450                     yield deferral.sleep(random.expovariate(1/120))
451             upnp_thread()
452         
453         # start listening for workers with a JSON-RPC server
454         
455         print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
456         
457         get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, best_share_var.value, bitcoind_work.value['bits'].target, bitcoind_work.value['subsidy'], net)
458         
459         wb = work.WorkerBridge(my_pubkey_hash, net, args.donation_percentage, bitcoind_work, best_block_header, merged_urls, best_share_var, tracker, my_share_hashes, my_doa_share_hashes, args.worker_fee, p2p_node, submit_block, set_best_share, broadcast_share, block_height_var)
460         web_root = web.get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net, wb.get_stale_counts, my_pubkey_hash, wb.local_rate_monitor, args.worker_fee, p2p_node, wb.my_share_hashes, wb.pseudoshare_received, wb.share_received, best_share_var, bitcoind_warning_var, traffic_happened)
461         worker_interface.WorkerInterface(wb).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
462         
463         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
464         
465         with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
466             pass
467         
468         print '    ...success!'
469         print
470         
471         
472         # done!
473         print 'Started successfully!'
474         print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
475         if args.donation_percentage > 0.51:
476             print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
477         elif args.donation_percentage < 0.49:
478             print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
479         else:
480             print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
481             print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
482         print
483         
484         
485         if hasattr(signal, 'SIGALRM'):
486             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
487                 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
488             ))
489             signal.siginterrupt(signal.SIGALRM, False)
490             task.LoopingCall(signal.alarm, 30).start(1)
491         
492         if args.irc_announce:
493             from twisted.words.protocols import irc
494             class IRCClient(irc.IRCClient):
495                 nickname = 'p2pool%02i' % (random.randrange(100),)
496                 channel = net.ANNOUNCE_CHANNEL
497                 def lineReceived(self, line):
498                     if p2pool.DEBUG:
499                         print repr(line)
500                     irc.IRCClient.lineReceived(self, line)
501                 def signedOn(self):
502                     irc.IRCClient.signedOn(self)
503                     self.factory.resetDelay()
504                     self.join(self.channel)
505                     @defer.inlineCallbacks
506                     def new_share(share):
507                         if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
508                             yield deferral.sleep(random.expovariate(1/60))
509                             message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
510                             if all('%x' % (share.header_hash,) not in old_message for old_message in self.recent_messages):
511                                 self.say(self.channel, message)
512                                 self._remember_message(message)
513                     self.watch_id = tracker.verified.added.watch(new_share)
514                     self.recent_messages = []
515                 def _remember_message(self, message):
516                     self.recent_messages.append(message)
517                     while len(self.recent_messages) > 100:
518                         self.recent_messages.pop(0)
519                 def privmsg(self, user, channel, message):
520                     if channel == self.channel:
521                         self._remember_message(message)
522                 def connectionLost(self, reason):
523                     tracker.verified.added.unwatch(self.watch_id)
524                     print 'IRC connection lost:', reason.getErrorMessage()
525             class IRCClientFactory(protocol.ReconnectingClientFactory):
526                 protocol = IRCClient
527             reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
528         
529         @defer.inlineCallbacks
530         def status_thread():
531             last_str = None
532             last_time = 0
533             while True:
534                 yield deferral.sleep(3)
535                 try:
536                     height = tracker.get_height(best_share_var.value)
537                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
538                         height,
539                         len(tracker.verified.items),
540                         len(tracker.items),
541                         len(p2p_node.peers),
542                         sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
543                     ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
544                     
545                     datums, dt = wb.local_rate_monitor.get_datums_in_last()
546                     my_att_s = sum(datum['work']/dt for datum in datums)
547                     this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
548                         math.format(int(my_att_s)),
549                         math.format_dt(dt),
550                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
551                         math.format_dt(2**256 / tracker.items[best_share_var.value].max_target / my_att_s) if my_att_s and best_share_var.value else '???',
552                     )
553                     
554                     if height > 2:
555                         (stale_orphan_shares, stale_doa_shares), shares, _ = wb.get_stale_counts()
556                         stale_prop = p2pool_data.get_average_stale_prop(tracker, best_share_var.value, min(60*60//net.SHARE_PERIOD, height))
557                         real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, best_share_var.value, min(height - 1, 60*60//net.SHARE_PERIOD)) / (1 - stale_prop)
558                         
559                         this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
560                             shares, stale_orphan_shares, stale_doa_shares,
561                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
562                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
563                             get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
564                         )
565                         this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
566                             math.format(int(real_att_s)),
567                             100*stale_prop,
568                             math.format_dt(2**256 / bitcoind_work.value['bits'].target / real_att_s),
569                         )
570                         
571                         for warning in p2pool_data.get_warnings(tracker, best_share_var.value, net, bitcoind_warning_var.value, bitcoind_work.value):
572                             print >>sys.stderr, '#'*40
573                             print >>sys.stderr, '>>> Warning: ' + warning
574                             print >>sys.stderr, '#'*40
575                     
576                     if this_str != last_str or time.time() > last_time + 15:
577                         print this_str
578                         last_str = this_str
579                         last_time = time.time()
580                 except:
581                     log.err()
582         status_thread()
583     except:
584         reactor.stop()
585         log.err(None, 'Fatal error:')
586
587 def run():
588     realnets = dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
589     
590     parser = fixargparse.FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
591     parser.add_argument('--version', action='version', version=p2pool.__version__)
592     parser.add_argument('--net',
593         help='use specified network (default: bitcoin)',
594         action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
595     parser.add_argument('--testnet',
596         help='''use the network's testnet''',
597         action='store_const', const=True, default=False, dest='testnet')
598     parser.add_argument('--debug',
599         help='enable debugging mode',
600         action='store_const', const=True, default=False, dest='debug')
601     parser.add_argument('-a', '--address',
602         help='generate payouts to this address (default: <address requested from bitcoind>)',
603         type=str, action='store', default=None, dest='address')
604     parser.add_argument('--datadir',
605         help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
606         type=str, action='store', default=None, dest='datadir')
607     parser.add_argument('--logfile',
608         help='''log to this file (default: data/<NET>/log)''',
609         type=str, action='store', default=None, dest='logfile')
610     parser.add_argument('--merged',
611         help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
612         type=str, action='append', default=[], dest='merged_urls')
613     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
614         help='donate this percentage of work towards the development of p2pool (default: 0.5)',
615         type=float, action='store', default=0.5, dest='donation_percentage')
616     parser.add_argument('--iocp',
617         help='use Windows IOCP API in order to avoid errors due to large number of sockets being open',
618         action='store_true', default=False, dest='iocp')
619     parser.add_argument('--irc-announce',
620         help='announce any blocks found on irc://irc.freenode.net/#p2pool',
621         action='store_true', default=False, dest='irc_announce')
622     parser.add_argument('--no-bugreport',
623         help='disable submitting caught exceptions to the author',
624         action='store_true', default=False, dest='no_bugreport')
625     
626     p2pool_group = parser.add_argument_group('p2pool interface')
627     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
628         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
629         type=int, action='store', default=None, dest='p2pool_port')
630     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
631         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
632         type=str, action='append', default=[], dest='p2pool_nodes')
633     parser.add_argument('--disable-upnp',
634         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
635         action='store_false', default=True, dest='upnp')
636     p2pool_group.add_argument('--max-conns', metavar='CONNS',
637         help='maximum incoming connections (default: 40)',
638         type=int, action='store', default=40, dest='p2pool_conns')
639     
640     worker_group = parser.add_argument_group('worker interface')
641     worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
642         help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
643         type=str, action='store', default=None, dest='worker_endpoint')
644     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
645         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
646         type=float, action='store', default=0, dest='worker_fee')
647     
648     bitcoind_group = parser.add_argument_group('bitcoind interface')
649     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
650         help='connect to this address (default: 127.0.0.1)',
651         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
652     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
653         help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
654         type=int, action='store', default=None, dest='bitcoind_rpc_port')
655     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
656         help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
657         type=int, action='store', default=None, dest='bitcoind_p2p_port')
658     
659     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
660         help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
661         type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
662     
663     args = parser.parse_args()
664     
665     if args.debug:
666         p2pool.DEBUG = True
667         defer.setDebugging(True)
668     
669     net_name = args.net_name + ('_testnet' if args.testnet else '')
670     net = networks.nets[net_name]
671     
672     datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
673     if not os.path.exists(datadir_path):
674         os.makedirs(datadir_path)
675     
676     if len(args.bitcoind_rpc_userpass) > 2:
677         parser.error('a maximum of two arguments are allowed')
678     args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
679     
680     if args.bitcoind_rpc_password is None:
681         conf_path = net.PARENT.CONF_FILE_FUNC()
682         if not os.path.exists(conf_path):
683             parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
684                 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
685                 '''\r\n'''
686                 '''server=1\r\n'''
687                 '''rpcpassword=%x\r\n'''
688                 '''\r\n'''
689                 '''Keep that password secret! After creating the file, restart Bitcoin.''' % (conf_path, random.randrange(2**128)))
690         with open(conf_path, 'rb') as f:
691             cp = ConfigParser.RawConfigParser()
692             cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
693             for conf_name, var_name, var_type in [
694                 ('rpcuser', 'bitcoind_rpc_username', str),
695                 ('rpcpassword', 'bitcoind_rpc_password', str),
696                 ('rpcport', 'bitcoind_rpc_port', int),
697                 ('port', 'bitcoind_p2p_port', int),
698             ]:
699                 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
700                     setattr(args, var_name, var_type(cp.get('x', conf_name)))
701         if args.bitcoind_rpc_password is None:
702             parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
703     
704     if args.bitcoind_rpc_username is None:
705         args.bitcoind_rpc_username = ''
706     
707     if args.bitcoind_rpc_port is None:
708         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
709     
710     if args.bitcoind_p2p_port is None:
711         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
712     
713     if args.p2pool_port is None:
714         args.p2pool_port = net.P2P_PORT
715     
716     if args.worker_endpoint is None:
717         worker_endpoint = '', net.WORKER_PORT
718     elif ':' not in args.worker_endpoint:
719         worker_endpoint = '', int(args.worker_endpoint)
720     else:
721         addr, port = args.worker_endpoint.rsplit(':', 1)
722         worker_endpoint = addr, int(port)
723     
724     if args.address is not None:
725         try:
726             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
727         except Exception, e:
728             parser.error('error parsing address: ' + repr(e))
729     else:
730         args.pubkey_hash = None
731     
732     def separate_url(url):
733         s = urlparse.urlsplit(url)
734         if '@' not in s.netloc:
735             parser.error('merged url netloc must contain an "@"')
736         userpass, new_netloc = s.netloc.rsplit('@', 1)
737         return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
738     merged_urls = map(separate_url, args.merged_urls)
739     
740     if args.logfile is None:
741         args.logfile = os.path.join(datadir_path, 'log')
742     
743     logfile = logging.LogFile(args.logfile)
744     pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
745     sys.stdout = logging.AbortPipe(pipe)
746     sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
747     if hasattr(signal, "SIGUSR1"):
748         def sigusr1(signum, frame):
749             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
750             logfile.reopen()
751             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
752         signal.signal(signal.SIGUSR1, sigusr1)
753     task.LoopingCall(logfile.reopen).start(5)
754     
755     class ErrorReporter(object):
756         def __init__(self):
757             self.last_sent = None
758         
759         def emit(self, eventDict):
760             if not eventDict["isError"]:
761                 return
762             
763             if self.last_sent is not None and time.time() < self.last_sent + 5:
764                 return
765             self.last_sent = time.time()
766             
767             if 'failure' in eventDict:
768                 text = ((eventDict.get('why') or 'Unhandled Error')
769                     + '\n' + eventDict['failure'].getTraceback())
770             else:
771                 text = " ".join([str(m) for m in eventDict["message"]]) + "\n"
772             
773             from twisted.web import client
774             client.getPage(
775                 url='http://u.forre.st/p2pool_error.cgi',
776                 method='POST',
777                 postdata=p2pool.__version__ + ' ' + net.NAME + '\n' + text,
778                 timeout=15,
779             ).addBoth(lambda x: None)
780     if not args.no_bugreport:
781         log.addObserver(ErrorReporter().emit)
782     
783     reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)
784     reactor.run()