4f16daa65d06fa08c2782fe2d819af54dfaf1cce
[p2pool.git] / p2pool / main.py
1 from __future__ import division
2
3 import ConfigParser
4 import StringIO
5 import base64
6 import json
7 import os
8 import random
9 import sys
10 import time
11 import signal
12 import traceback
13 import urlparse
14
15 if '--iocp' in sys.argv:
16     from twisted.internet import iocpreactor
17     iocpreactor.install()
18 from twisted.internet import defer, reactor, protocol, task
19 from twisted.web import server
20 from twisted.python import log
21 from nattraverso import portmapper, ipdiscover
22
23 import bitcoin.p2p as bitcoin_p2p, bitcoin.data as bitcoin_data
24 from bitcoin import worker_interface, height_tracker
25 from util import fixargparse, jsonrpc, variable, deferral, math, logging
26 from . import p2p, networks, web, work
27 import p2pool, p2pool.data as p2pool_data
28
29 @deferral.retry('Error getting work from bitcoind:', 3)
30 @defer.inlineCallbacks
31 def getwork(bitcoind, use_getblocktemplate=False):
32     def go():
33         if use_getblocktemplate:
34             return bitcoind.rpc_getblocktemplate({})
35         else:
36             return bitcoind.rpc_getmemorypool()
37     try:
38         work = yield go()
39     except jsonrpc.Error_for_code(-32601): # Method not found
40         use_getblocktemplate = not use_getblocktemplate
41         try:
42             work = yield go()
43         except jsonrpc.Error_for_code(-32601): # Method not found
44             print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
45             raise deferral.RetrySilentlyException()
46     packed_transactions = [(x['data'] if isinstance(x, dict) else x).decode('hex') for x in work['transactions']]
47     if 'height' not in work:
48         work['height'] = (yield bitcoind.rpc_getblock(work['previousblockhash']))['height'] + 1
49     elif p2pool.DEBUG:
50         assert work['height'] == (yield bitcoind.rpc_getblock(work['previousblockhash']))['height'] + 1
51     defer.returnValue(dict(
52         version=work['version'],
53         previous_block=int(work['previousblockhash'], 16),
54         transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
55         merkle_link=bitcoin_data.calculate_merkle_link([None] + map(bitcoin_data.hash256, packed_transactions), 0),
56         subsidy=work['coinbasevalue'],
57         time=work['time'] if 'time' in work else work['curtime'],
58         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
59         coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
60         height=work['height'],
61         last_update=time.time(),
62         use_getblocktemplate=use_getblocktemplate,
63     ))
64
65 @defer.inlineCallbacks
66 def main(args, net, datadir_path, merged_urls, worker_endpoint):
67     try:
68         print 'p2pool (version %s)' % (p2pool.__version__,)
69         print
70         
71         # connect to bitcoind over JSON-RPC and do initial getmemorypool
72         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
73         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
74         bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
75         @deferral.retry('Error while checking Bitcoin connection:', 1)
76         @defer.inlineCallbacks
77         def check():
78             if not (yield net.PARENT.RPC_CHECK(bitcoind)):
79                 print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
80                 raise deferral.RetrySilentlyException()
81             temp_work = yield getwork(bitcoind)
82             if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version'], temp_work):
83                 print >>sys.stderr, '    Bitcoin version too old! BIP16 support required! Upgrade to 0.6.0rc4 or greater!'
84                 raise deferral.RetrySilentlyException()
85             defer.returnValue(temp_work)
86         temp_work = yield check()
87         
88         block_height_var = variable.Variable(None)
89         @defer.inlineCallbacks
90         def poll_height():
91             block_height_var.set((yield deferral.retry('Error while calling getblockcount:')(bitcoind.rpc_getblockcount)()))
92         yield poll_height()
93         task.LoopingCall(poll_height).start(60*60)
94         
95         bitcoind_warning_var = variable.Variable(None)
96         @defer.inlineCallbacks
97         def poll_warnings():
98             errors = (yield deferral.retry('Error while calling getmininginfo:')(bitcoind.rpc_getmininginfo)())['errors']
99             bitcoind_warning_var.set(errors if errors != '' else None)
100         yield poll_warnings()
101         task.LoopingCall(poll_warnings).start(20*60)
102         
103         print '    ...success!'
104         print '    Current block hash: %x' % (temp_work['previous_block'],)
105         print '    Current block height: %i' % (block_height_var.value,)
106         print
107         
108         # connect to bitcoind over bitcoin-p2p
109         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
110         factory = bitcoin_p2p.ClientFactory(net.PARENT)
111         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
112         yield factory.getProtocol() # waits until handshake is successful
113         print '    ...success!'
114         print
115         
116         print 'Determining payout address...'
117         if args.pubkey_hash is None:
118             address_path = os.path.join(datadir_path, 'cached_payout_address')
119             
120             if os.path.exists(address_path):
121                 with open(address_path, 'rb') as f:
122                     address = f.read().strip('\r\n')
123                 print '    Loaded cached address: %s...' % (address,)
124             else:
125                 address = None
126             
127             if address is not None:
128                 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
129                 if not res['isvalid'] or not res['ismine']:
130                     print '    Cached address is either invalid or not controlled by local bitcoind!'
131                     address = None
132             
133             if address is None:
134                 print '    Getting payout address from bitcoind...'
135                 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
136             
137             with open(address_path, 'wb') as f:
138                 f.write(address)
139             
140             my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
141         else:
142             my_pubkey_hash = args.pubkey_hash
143         print '    ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
144         print
145         
146         my_share_hashes = set()
147         my_doa_share_hashes = set()
148         
149         tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
150         shared_share_hashes = set()
151         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
152         known_verified = set()
153         print "Loading shares..."
154         for i, (mode, contents) in enumerate(ss.get_shares()):
155             if mode == 'share':
156                 if contents.hash in tracker.items:
157                     continue
158                 shared_share_hashes.add(contents.hash)
159                 contents.time_seen = 0
160                 tracker.add(contents)
161                 if len(tracker.items) % 1000 == 0 and tracker.items:
162                     print "    %i" % (len(tracker.items),)
163             elif mode == 'verified_hash':
164                 known_verified.add(contents)
165             else:
166                 raise AssertionError()
167         print "    ...inserting %i verified shares..." % (len(known_verified),)
168         for h in known_verified:
169             if h not in tracker.items:
170                 ss.forget_verified_share(h)
171                 continue
172             tracker.verified.add(tracker.items[h])
173         print "    ...done loading %i shares!" % (len(tracker.items),)
174         print
175         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
176         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
177         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
178         
179         print 'Initializing work...'
180         
181         
182         # BITCOIND WORK
183         
184         bitcoind_work = variable.Variable((yield getwork(bitcoind)))
185         @defer.inlineCallbacks
186         def work_poller():
187             while True:
188                 flag = factory.new_block.get_deferred()
189                 try:
190                     bitcoind_work.set((yield getwork(bitcoind, bitcoind_work.value['use_getblocktemplate'])))
191                 except:
192                     log.err()
193                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
194         work_poller()
195         
196         # PEER WORK
197         
198         best_block_header = variable.Variable(None)
199         def handle_header(new_header):
200             # check that header matches current target
201             if not (net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(new_header)) <= bitcoind_work.value['bits'].target):
202                 return
203             bitcoind_best_block = bitcoind_work.value['previous_block']
204             if (best_block_header.value is None
205                 or (
206                     new_header['previous_block'] == bitcoind_best_block and
207                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)) == bitcoind_best_block
208                 ) # new is child of current and previous is current
209                 or (
210                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(new_header)) == bitcoind_best_block and
211                     best_block_header.value['previous_block'] != bitcoind_best_block
212                 )): # new is current and previous is not a child of current
213                 best_block_header.set(new_header)
214         @defer.inlineCallbacks
215         def poll_header():
216             handle_header((yield factory.conn.value.get_block_header(bitcoind_work.value['previous_block'])))
217         bitcoind_work.changed.watch(lambda _: poll_header())
218         yield deferral.retry('Error while requesting best block header:')(poll_header)()
219         
220         # BEST SHARE
221         
222         get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: bitcoind_work.value['previous_block'], net)
223         
224         best_share_var = variable.Variable(None)
225         desired_var = variable.Variable(None)
226         def set_best_share():
227             best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'])
228             
229             best_share_var.set(best)
230             desired_var.set(desired)
231         bitcoind_work.changed.watch(lambda _: set_best_share())
232         set_best_share()
233         
234         print '    ...success!'
235         print
236         
237         # setup p2p logic and join p2pool network
238         
239         class Node(p2p.Node):
240             def handle_shares(self, shares, peer):
241                 if len(shares) > 5:
242                     print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
243                 
244                 new_count = 0
245                 for share in shares:
246                     if share.hash in tracker.items:
247                         #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
248                         continue
249                     
250                     new_count += 1
251                     
252                     #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
253                     
254                     tracker.add(share)
255                 
256                 if new_count:
257                     set_best_share()
258                 
259                 if len(shares) > 5:
260                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.items), 2*net.CHAIN_LENGTH)
261             
262             @defer.inlineCallbacks
263             def handle_share_hashes(self, hashes, peer):
264                 new_hashes = [x for x in hashes if x not in tracker.items]
265                 if not new_hashes:
266                     return
267                 try:
268                     shares = yield peer.get_shares(
269                         hashes=new_hashes,
270                         parents=0,
271                         stops=[],
272                     )
273                 except:
274                     log.err(None, 'in handle_share_hashes:')
275                 else:
276                     self.handle_shares(shares, peer)
277             
278             def handle_get_shares(self, hashes, parents, stops, peer):
279                 parents = min(parents, 1000//len(hashes))
280                 stops = set(stops)
281                 shares = []
282                 for share_hash in hashes:
283                     for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
284                         if share.hash in stops:
285                             break
286                         shares.append(share)
287                 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
288                 return shares
289             
290             def handle_bestblock(self, header, peer):
291                 if net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header)) > header['bits'].target:
292                     raise p2p.PeerMisbehavingError('received block header fails PoW test')
293                 handle_header(header)
294         
295         @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
296         def submit_block_p2p(block):
297             if factory.conn.value is None:
298                 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%32x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, bitcoin_data.hash256(bitcoin_data.block_header_type.pack(block['header'])))
299                 raise deferral.RetrySilentlyException()
300             factory.conn.value.send_block(block=block)
301         
302         @deferral.retry('Error submitting block: (will retry)', 10, 10)
303         @defer.inlineCallbacks
304         def submit_block_rpc(block, ignore_failure):
305             if bitcoind_work.value['use_getblocktemplate']:
306                 success = yield bitcoind.rpc_getblocktemplate(dict(data=bitcoin_data.block_type.pack(block).encode('hex')))
307             else:
308                 success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
309             success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
310             if (not success and success_expected and not ignore_failure) or (success and not success_expected):
311                 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (success, success_expected)
312         
313         def submit_block(block, ignore_failure):
314             submit_block_p2p(block)
315             submit_block_rpc(block, ignore_failure)
316         
317         @tracker.verified.added.watch
318         def _(share):
319             if share.pow_hash <= share.header['bits'].target:
320                 submit_block(share.as_block(tracker), ignore_failure=True)
321                 print
322                 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
323                 print
324                 def spread():
325                     if (get_height_rel_highest(share.header['previous_block']) > -5 or
326                         bitcoind_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]):
327                         broadcast_share(share.hash)
328                 spread()
329                 reactor.callLater(5, spread) # so get_height_rel_highest can update
330         
331         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
332         
333         @defer.inlineCallbacks
334         def parse(x):
335             if ':' in x:
336                 ip, port = x.split(':')
337                 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
338             else:
339                 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
340         
341         addrs = {}
342         if os.path.exists(os.path.join(datadir_path, 'addrs')):
343             try:
344                 with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
345                     addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
346             except:
347                 print >>sys.stderr, 'error parsing addrs'
348         for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
349             try:
350                 addr = yield addr_df
351                 if addr not in addrs:
352                     addrs[addr] = (0, time.time(), time.time())
353             except:
354                 log.err()
355         
356         connect_addrs = set()
357         for addr_df in map(parse, args.p2pool_nodes):
358             try:
359                 connect_addrs.add((yield addr_df))
360             except:
361                 log.err()
362         
363         p2p_node = Node(
364             best_share_hash_func=lambda: best_share_var.value,
365             port=args.p2pool_port,
366             net=net,
367             addr_store=addrs,
368             connect_addrs=connect_addrs,
369             max_incoming_conns=args.p2pool_conns,
370         )
371         p2p_node.start()
372         
373         def save_addrs():
374             with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
375                 f.write(json.dumps(p2p_node.addr_store.items()))
376         task.LoopingCall(save_addrs).start(60)
377         
378         @best_block_header.changed.watch
379         def _(header):
380             for peer in p2p_node.peers.itervalues():
381                 peer.send_bestblock(header=header)
382         
383         @defer.inlineCallbacks
384         def broadcast_share(share_hash):
385             shares = []
386             for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
387                 if share.hash in shared_share_hashes:
388                     break
389                 shared_share_hashes.add(share.hash)
390                 shares.append(share)
391             
392             for peer in list(p2p_node.peers.itervalues()):
393                 yield peer.sendShares([share for share in shares if share.peer is not peer])
394         
395         # send share when the chain changes to their chain
396         best_share_var.changed.watch(broadcast_share)
397         
398         def save_shares():
399             for share in tracker.get_chain(best_share_var.value, min(tracker.get_height(best_share_var.value), 2*net.CHAIN_LENGTH)):
400                 ss.add_share(share)
401                 if share.hash in tracker.verified.items:
402                     ss.add_verified_hash(share.hash)
403         task.LoopingCall(save_shares).start(60)
404         
405         @apply
406         @defer.inlineCallbacks
407         def download_shares():
408             while True:
409                 desired = yield desired_var.get_when_satisfies(lambda val: len(val) != 0)
410                 peer2, share_hash = random.choice(desired)
411                 
412                 if len(p2p_node.peers) == 0:
413                     yield deferral.sleep(1)
414                     continue
415                 peer = random.choice(p2p_node.peers.values())
416                 
417                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
418                 try:
419                     shares = yield peer.get_shares(
420                         hashes=[share_hash],
421                         parents=500,
422                         stops=[],
423                     )
424                 except:
425                     log.err(None, 'in download_shares:')
426                 else:
427                     p2p_node.handle_shares(shares, peer)
428         
429         print '    ...success!'
430         print
431         
432         if args.upnp:
433             @defer.inlineCallbacks
434             def upnp_thread():
435                 while True:
436                     try:
437                         is_lan, lan_ip = yield ipdiscover.get_local_ip()
438                         if is_lan:
439                             pm = yield portmapper.get_port_mapper()
440                             yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
441                     except defer.TimeoutError:
442                         pass
443                     except:
444                         if p2pool.DEBUG:
445                             log.err(None, 'UPnP error:')
446                     yield deferral.sleep(random.expovariate(1/120))
447             upnp_thread()
448         
449         # start listening for workers with a JSON-RPC server
450         
451         print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
452         
453         get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, best_share_var.value, bitcoind_work.value['bits'].target, bitcoind_work.value['subsidy'], net)
454         
455         wb = work.WorkerBridge(my_pubkey_hash, net, args.donation_percentage, bitcoind_work, best_block_header, merged_urls, best_share_var, tracker, my_share_hashes, my_doa_share_hashes, args.worker_fee, p2p_node, submit_block, set_best_share, broadcast_share, block_height_var)
456         web_root = web.get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net, wb.get_stale_counts, my_pubkey_hash, wb.local_rate_monitor, args.worker_fee, p2p_node, wb.my_share_hashes, wb.pseudoshare_received, wb.share_received, best_share_var, bitcoind_warning_var)
457         worker_interface.WorkerInterface(wb).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
458         
459         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
460         
461         with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
462             pass
463         
464         print '    ...success!'
465         print
466         
467         
468         # done!
469         print 'Started successfully!'
470         print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
471         if args.donation_percentage > 0.51:
472             print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
473         elif args.donation_percentage < 0.49:
474             print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
475         else:
476             print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
477             print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
478         print
479         
480         
481         if hasattr(signal, 'SIGALRM'):
482             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
483                 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
484             ))
485             signal.siginterrupt(signal.SIGALRM, False)
486             task.LoopingCall(signal.alarm, 30).start(1)
487         
488         if args.irc_announce:
489             from twisted.words.protocols import irc
490             class IRCClient(irc.IRCClient):
491                 nickname = 'p2pool%02i' % (random.randrange(100),)
492                 channel = net.ANNOUNCE_CHANNEL
493                 def lineReceived(self, line):
494                     if p2pool.DEBUG:
495                         print repr(line)
496                     irc.IRCClient.lineReceived(self, line)
497                 def signedOn(self):
498                     irc.IRCClient.signedOn(self)
499                     self.factory.resetDelay()
500                     self.join(self.channel)
501                     @defer.inlineCallbacks
502                     def new_share(share):
503                         if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
504                             yield deferral.sleep(random.expovariate(1/60))
505                             message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
506                             if all('%x' % (share.header_hash,) not in old_message for old_message in self.recent_messages):
507                                 self.say(self.channel, message)
508                                 self._remember_message(message)
509                     self.watch_id = tracker.verified.added.watch(new_share)
510                     self.recent_messages = []
511                 def _remember_message(self, message):
512                     self.recent_messages.append(message)
513                     while len(self.recent_messages) > 100:
514                         self.recent_messages.pop(0)
515                 def privmsg(self, user, channel, message):
516                     if channel == self.channel:
517                         self._remember_message(message)
518                 def connectionLost(self, reason):
519                     tracker.verified.added.unwatch(self.watch_id)
520                     print 'IRC connection lost:', reason.getErrorMessage()
521             class IRCClientFactory(protocol.ReconnectingClientFactory):
522                 protocol = IRCClient
523             reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
524         
525         @defer.inlineCallbacks
526         def status_thread():
527             last_str = None
528             last_time = 0
529             while True:
530                 yield deferral.sleep(3)
531                 try:
532                     height = tracker.get_height(best_share_var.value)
533                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
534                         height,
535                         len(tracker.verified.items),
536                         len(tracker.items),
537                         len(p2p_node.peers),
538                         sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
539                     ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
540                     
541                     datums, dt = wb.local_rate_monitor.get_datums_in_last()
542                     my_att_s = sum(datum['work']/dt for datum in datums)
543                     this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
544                         math.format(int(my_att_s)),
545                         math.format_dt(dt),
546                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
547                         math.format_dt(2**256 / tracker.items[best_share_var.value].max_target / my_att_s) if my_att_s and best_share_var.value else '???',
548                     )
549                     
550                     if height > 2:
551                         (stale_orphan_shares, stale_doa_shares), shares, _ = wb.get_stale_counts()
552                         stale_prop = p2pool_data.get_average_stale_prop(tracker, best_share_var.value, min(60*60//net.SHARE_PERIOD, height))
553                         real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, best_share_var.value, min(height - 1, 60*60//net.SHARE_PERIOD)) / (1 - stale_prop)
554                         
555                         this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
556                             shares, stale_orphan_shares, stale_doa_shares,
557                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
558                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
559                             get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
560                         )
561                         this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
562                             math.format(int(real_att_s)),
563                             100*stale_prop,
564                             math.format_dt(2**256 / bitcoind_work.value['bits'].target / real_att_s),
565                         )
566                         
567                         for warning in p2pool_data.get_warnings(tracker, best_share_var.value, net, bitcoind_warning_var.value, bitcoind_work.value):
568                             print >>sys.stderr, '#'*40
569                             print >>sys.stderr, '>>> Warning: ' + warning
570                             print >>sys.stderr, '#'*40
571                     
572                     if this_str != last_str or time.time() > last_time + 15:
573                         print this_str
574                         last_str = this_str
575                         last_time = time.time()
576                 except:
577                     log.err()
578         status_thread()
579     except:
580         reactor.stop()
581         log.err(None, 'Fatal error:')
582
583 def run():
584     realnets = dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
585     
586     parser = fixargparse.FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
587     parser.add_argument('--version', action='version', version=p2pool.__version__)
588     parser.add_argument('--net',
589         help='use specified network (default: bitcoin)',
590         action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
591     parser.add_argument('--testnet',
592         help='''use the network's testnet''',
593         action='store_const', const=True, default=False, dest='testnet')
594     parser.add_argument('--debug',
595         help='enable debugging mode',
596         action='store_const', const=True, default=False, dest='debug')
597     parser.add_argument('-a', '--address',
598         help='generate payouts to this address (default: <address requested from bitcoind>)',
599         type=str, action='store', default=None, dest='address')
600     parser.add_argument('--datadir',
601         help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
602         type=str, action='store', default=None, dest='datadir')
603     parser.add_argument('--logfile',
604         help='''log to this file (default: data/<NET>/log)''',
605         type=str, action='store', default=None, dest='logfile')
606     parser.add_argument('--merged',
607         help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
608         type=str, action='append', default=[], dest='merged_urls')
609     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
610         help='donate this percentage of work towards the development of p2pool (default: 0.5)',
611         type=float, action='store', default=0.5, dest='donation_percentage')
612     parser.add_argument('--iocp',
613         help='use Windows IOCP API in order to avoid errors due to large number of sockets being open',
614         action='store_true', default=False, dest='iocp')
615     parser.add_argument('--irc-announce',
616         help='announce any blocks found on irc://irc.freenode.net/#p2pool',
617         action='store_true', default=False, dest='irc_announce')
618     parser.add_argument('--no-bugreport',
619         help='disable submitting caught exceptions to the author',
620         action='store_true', default=False, dest='no_bugreport')
621     
622     p2pool_group = parser.add_argument_group('p2pool interface')
623     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
624         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
625         type=int, action='store', default=None, dest='p2pool_port')
626     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
627         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
628         type=str, action='append', default=[], dest='p2pool_nodes')
629     parser.add_argument('--disable-upnp',
630         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
631         action='store_false', default=True, dest='upnp')
632     p2pool_group.add_argument('--max-conns', metavar='CONNS',
633         help='maximum incoming connections (default: 40)',
634         type=int, action='store', default=40, dest='p2pool_conns')
635     
636     worker_group = parser.add_argument_group('worker interface')
637     worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
638         help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
639         type=str, action='store', default=None, dest='worker_endpoint')
640     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
641         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
642         type=float, action='store', default=0, dest='worker_fee')
643     
644     bitcoind_group = parser.add_argument_group('bitcoind interface')
645     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
646         help='connect to this address (default: 127.0.0.1)',
647         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
648     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
649         help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
650         type=int, action='store', default=None, dest='bitcoind_rpc_port')
651     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
652         help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
653         type=int, action='store', default=None, dest='bitcoind_p2p_port')
654     
655     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
656         help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
657         type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
658     
659     args = parser.parse_args()
660     
661     if args.debug:
662         p2pool.DEBUG = True
663         defer.setDebugging(True)
664     
665     net_name = args.net_name + ('_testnet' if args.testnet else '')
666     net = networks.nets[net_name]
667     
668     datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
669     if not os.path.exists(datadir_path):
670         os.makedirs(datadir_path)
671     
672     if len(args.bitcoind_rpc_userpass) > 2:
673         parser.error('a maximum of two arguments are allowed')
674     args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
675     
676     if args.bitcoind_rpc_password is None:
677         conf_path = net.PARENT.CONF_FILE_FUNC()
678         if not os.path.exists(conf_path):
679             parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
680                 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
681                 '''\r\n'''
682                 '''server=1\r\n'''
683                 '''rpcpassword=%x\r\n'''
684                 '''\r\n'''
685                 '''Keep that password secret! After creating the file, restart Bitcoin.''' % (conf_path, random.randrange(2**128)))
686         with open(conf_path, 'rb') as f:
687             cp = ConfigParser.RawConfigParser()
688             cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
689             for conf_name, var_name, var_type in [
690                 ('rpcuser', 'bitcoind_rpc_username', str),
691                 ('rpcpassword', 'bitcoind_rpc_password', str),
692                 ('rpcport', 'bitcoind_rpc_port', int),
693                 ('port', 'bitcoind_p2p_port', int),
694             ]:
695                 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
696                     setattr(args, var_name, var_type(cp.get('x', conf_name)))
697         if args.bitcoind_rpc_password is None:
698             parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
699     
700     if args.bitcoind_rpc_username is None:
701         args.bitcoind_rpc_username = ''
702     
703     if args.bitcoind_rpc_port is None:
704         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
705     
706     if args.bitcoind_p2p_port is None:
707         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
708     
709     if args.p2pool_port is None:
710         args.p2pool_port = net.P2P_PORT
711     
712     if args.worker_endpoint is None:
713         worker_endpoint = '', net.WORKER_PORT
714     elif ':' not in args.worker_endpoint:
715         worker_endpoint = '', int(args.worker_endpoint)
716     else:
717         addr, port = args.worker_endpoint.rsplit(':', 1)
718         worker_endpoint = addr, int(port)
719     
720     if args.address is not None:
721         try:
722             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
723         except Exception, e:
724             parser.error('error parsing address: ' + repr(e))
725     else:
726         args.pubkey_hash = None
727     
728     def separate_url(url):
729         s = urlparse.urlsplit(url)
730         if '@' not in s.netloc:
731             parser.error('merged url netloc must contain an "@"')
732         userpass, new_netloc = s.netloc.rsplit('@', 1)
733         return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
734     merged_urls = map(separate_url, args.merged_urls)
735     
736     if args.logfile is None:
737         args.logfile = os.path.join(datadir_path, 'log')
738     
739     logfile = logging.LogFile(args.logfile)
740     pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
741     sys.stdout = logging.AbortPipe(pipe)
742     sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
743     if hasattr(signal, "SIGUSR1"):
744         def sigusr1(signum, frame):
745             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
746             logfile.reopen()
747             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
748         signal.signal(signal.SIGUSR1, sigusr1)
749     task.LoopingCall(logfile.reopen).start(5)
750     
751     class ErrorReporter(object):
752         def __init__(self):
753             self.last_sent = None
754         
755         def emit(self, eventDict):
756             if not eventDict["isError"]:
757                 return
758             
759             if self.last_sent is not None and time.time() < self.last_sent + 5:
760                 return
761             self.last_sent = time.time()
762             
763             if 'failure' in eventDict:
764                 text = ((eventDict.get('why') or 'Unhandled Error')
765                     + '\n' + eventDict['failure'].getTraceback())
766             else:
767                 text = " ".join([str(m) for m in eventDict["message"]]) + "\n"
768             
769             from twisted.web import client
770             client.getPage(
771                 url='http://u.forre.st/p2pool_error.cgi',
772                 method='POST',
773                 postdata=p2pool.__version__ + ' ' + net.NAME + '\n' + text,
774                 timeout=15,
775             ).addBoth(lambda x: None)
776     if not args.no_bugreport:
777         log.addObserver(ErrorReporter().emit)
778     
779     reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)
780     reactor.run()