moved 'lost contact with bitcoind' into warnings handler, so it appears on web gui
[p2pool.git] / p2pool / main.py
1 from __future__ import division
2
3 import ConfigParser
4 import StringIO
5 import base64
6 import json
7 import os
8 import random
9 import sys
10 import time
11 import signal
12 import traceback
13 import urlparse
14
15 if '--iocp' in sys.argv:
16     from twisted.internet import iocpreactor
17     iocpreactor.install()
18 from twisted.internet import defer, reactor, protocol, task
19 from twisted.web import server
20 from twisted.python import log
21 from nattraverso import portmapper, ipdiscover
22
23 import bitcoin.p2p as bitcoin_p2p, bitcoin.data as bitcoin_data
24 from bitcoin import worker_interface, height_tracker
25 from util import expiring_dict, fixargparse, jsonrpc, variable, deferral, math, logging
26 from . import p2p, networks, web, work
27 import p2pool, p2pool.data as p2pool_data
28
29 @deferral.retry('Error getting work from bitcoind:', 3)
30 @defer.inlineCallbacks
31 def getwork(bitcoind):
32     try:
33         work = yield bitcoind.rpc_getmemorypool()
34     except jsonrpc.Error, e:
35         if e.code == -32601: # Method not found
36             print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
37             raise deferral.RetrySilentlyException()
38         raise
39     packed_transactions = [x.decode('hex') for x in work['transactions']]
40     if 'height' not in work:
41         work['height'] = (yield bitcoind.rpc_getblock(work['previousblockhash']))['height'] + 1
42     defer.returnValue(dict(
43         previous_block=int(work['previousblockhash'], 16),
44         transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
45         merkle_link=bitcoin_data.calculate_merkle_link([None] + map(bitcoin_data.hash256, packed_transactions), 0),
46         subsidy=work['coinbasevalue'],
47         time=work['time'],
48         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
49         coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
50         height=work['height'],
51         clock_offset=time.time() - work['time'],
52         last_update=time.time(),
53     ))
54
55 @defer.inlineCallbacks
56 def main(args, net, datadir_path, merged_urls, worker_endpoint):
57     try:
58         print 'p2pool (version %s)' % (p2pool.__version__,)
59         print
60         
61         # connect to bitcoind over JSON-RPC and do initial getmemorypool
62         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
63         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
64         bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
65         @deferral.retry('Error while checking Bitcoin connection:', 1)
66         @defer.inlineCallbacks
67         def check():
68             if not (yield net.PARENT.RPC_CHECK(bitcoind)):
69                 print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
70                 raise deferral.RetrySilentlyException()
71             temp_work = yield getwork(bitcoind)
72             if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version'], temp_work):
73                 print >>sys.stderr, '    Bitcoin version too old! BIP16 support required! Upgrade to 0.6.0rc4 or greater!'
74                 raise deferral.RetrySilentlyException()
75             defer.returnValue(temp_work)
76         temp_work = yield check()
77         
78         block_height_var = variable.Variable(None)
79         @defer.inlineCallbacks
80         def poll_height():
81             block_height_var.set((yield deferral.retry('Error while calling getblockcount:')(bitcoind.rpc_getblockcount)()))
82         yield poll_height()
83         task.LoopingCall(poll_height).start(60*60)
84         
85         bitcoind_warning_var = variable.Variable(None)
86         @defer.inlineCallbacks
87         def poll_warnings():
88             errors = (yield deferral.retry('Error while calling getmininginfo:')(bitcoind.rpc_getmininginfo)())['errors']
89             bitcoind_warning_var.set(errors if errors != '' else None)
90         yield poll_warnings()
91         task.LoopingCall(poll_warnings).start(20*60)
92         
93         print '    ...success!'
94         print '    Current block hash: %x' % (temp_work['previous_block'],)
95         print '    Current block height: %i' % (block_height_var.value,)
96         print
97         
98         # connect to bitcoind over bitcoin-p2p
99         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
100         factory = bitcoin_p2p.ClientFactory(net.PARENT)
101         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
102         yield factory.getProtocol() # waits until handshake is successful
103         print '    ...success!'
104         print
105         
106         print 'Determining payout address...'
107         if args.pubkey_hash is None:
108             address_path = os.path.join(datadir_path, 'cached_payout_address')
109             
110             if os.path.exists(address_path):
111                 with open(address_path, 'rb') as f:
112                     address = f.read().strip('\r\n')
113                 print '    Loaded cached address: %s...' % (address,)
114             else:
115                 address = None
116             
117             if address is not None:
118                 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
119                 if not res['isvalid'] or not res['ismine']:
120                     print '    Cached address is either invalid or not controlled by local bitcoind!'
121                     address = None
122             
123             if address is None:
124                 print '    Getting payout address from bitcoind...'
125                 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
126             
127             with open(address_path, 'wb') as f:
128                 f.write(address)
129             
130             my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
131         else:
132             my_pubkey_hash = args.pubkey_hash
133         print '    ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
134         print
135         
136         my_share_hashes = set()
137         my_doa_share_hashes = set()
138         
139         tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
140         shared_share_hashes = set()
141         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
142         known_verified = set()
143         print "Loading shares..."
144         for i, (mode, contents) in enumerate(ss.get_shares()):
145             if mode == 'share':
146                 if contents.hash in tracker.items:
147                     continue
148                 shared_share_hashes.add(contents.hash)
149                 contents.time_seen = 0
150                 tracker.add(contents)
151                 if len(tracker.items) % 1000 == 0 and tracker.items:
152                     print "    %i" % (len(tracker.items),)
153             elif mode == 'verified_hash':
154                 known_verified.add(contents)
155             else:
156                 raise AssertionError()
157         print "    ...inserting %i verified shares..." % (len(known_verified),)
158         for h in known_verified:
159             if h not in tracker.items:
160                 ss.forget_verified_share(h)
161                 continue
162             tracker.verified.add(tracker.items[h])
163         print "    ...done loading %i shares!" % (len(tracker.items),)
164         print
165         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
166         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
167         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
168         
169         print 'Initializing work...'
170         
171         
172         # BITCOIND WORK
173         
174         bitcoind_work = variable.Variable((yield getwork(bitcoind)))
175         @defer.inlineCallbacks
176         def work_poller():
177             while True:
178                 flag = factory.new_block.get_deferred()
179                 try:
180                     bitcoind_work.set((yield getwork(bitcoind)))
181                 except:
182                     log.err()
183                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
184         work_poller()
185         
186         # PEER WORK
187         
188         best_block_header = variable.Variable(None)
189         def handle_header(new_header):
190             # check that header matches current target
191             if not (net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(new_header)) <= bitcoind_work.value['bits'].target):
192                 return
193             bitcoind_best_block = bitcoind_work.value['previous_block']
194             if (best_block_header.value is None
195                 or (
196                     new_header['previous_block'] == bitcoind_best_block and
197                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)) == bitcoind_best_block
198                 ) # new is child of current and previous is current
199                 or (
200                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(new_header)) == bitcoind_best_block and
201                     best_block_header.value['previous_block'] != bitcoind_best_block
202                 )): # new is current and previous is not a child of current
203                 best_block_header.set(new_header)
204         @defer.inlineCallbacks
205         def poll_header():
206             handle_header((yield factory.conn.value.get_block_header(bitcoind_work.value['previous_block'])))
207         bitcoind_work.changed.watch(lambda _: poll_header())
208         yield deferral.retry('Error while requesting best block header:')(poll_header)()
209         
210         # BEST SHARE
211         
212         get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: bitcoind_work.value['previous_block'], net)
213         requested = expiring_dict.ExpiringDict(300)
214         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
215         
216         best_share_var = variable.Variable(None)
217         def set_best_share():
218             best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'])
219             
220             best_share_var.set(best)
221             
222             t = time.time()
223             for peer2, share_hash in desired:
224                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
225                     continue
226                 last_request_time, count = requested.get(share_hash, (None, 0))
227                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
228                     continue
229                 potential_peers = set()
230                 for head in tracker.tails[share_hash]:
231                     potential_peers.update(peer_heads.get(head, set()))
232                 potential_peers = [peer for peer in potential_peers if peer.connected2]
233                 if count == 0 and peer2 is not None and peer2.connected2:
234                     peer = peer2
235                 else:
236                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
237                     if peer is None:
238                         continue
239                 
240                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
241                 peer.send_getshares(
242                     hashes=[share_hash],
243                     parents=2000,
244                     stops=list(set(tracker.heads) | set(
245                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
246                     ))[:100],
247                 )
248                 requested[share_hash] = t, count + 1
249         bitcoind_work.changed.watch(lambda _: set_best_share())
250         set_best_share()
251         
252         
253         print '    ...success!'
254         print
255         
256         # setup p2p logic and join p2pool network
257         
258         class Node(p2p.Node):
259             def handle_shares(self, shares, peer):
260                 if len(shares) > 5:
261                     print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
262                 
263                 new_count = 0
264                 for share in shares:
265                     if share.hash in tracker.items:
266                         #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
267                         continue
268                     
269                     new_count += 1
270                     
271                     #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
272                     
273                     tracker.add(share)
274                 
275                 if shares and peer is not None:
276                     peer_heads.setdefault(shares[0].hash, set()).add(peer)
277                 
278                 if new_count:
279                     set_best_share()
280                 
281                 if len(shares) > 5:
282                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.items), 2*net.CHAIN_LENGTH)
283             
284             def handle_share_hashes(self, hashes, peer):
285                 t = time.time()
286                 get_hashes = []
287                 for share_hash in hashes:
288                     if share_hash in tracker.items:
289                         continue
290                     last_request_time, count = requested.get(share_hash, (None, 0))
291                     if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
292                         continue
293                     print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
294                     get_hashes.append(share_hash)
295                     requested[share_hash] = t, count + 1
296                 
297                 if hashes and peer is not None:
298                     peer_heads.setdefault(hashes[0], set()).add(peer)
299                 if get_hashes:
300                     peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
301             
302             def handle_get_shares(self, hashes, parents, stops, peer):
303                 parents = min(parents, 1000//len(hashes))
304                 stops = set(stops)
305                 shares = []
306                 for share_hash in hashes:
307                     for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
308                         if share.hash in stops:
309                             break
310                         shares.append(share)
311                 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
312                 return shares
313             
314             def handle_bestblock(self, header, peer):
315                 if net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header)) > header['bits'].target:
316                     raise p2p.PeerMisbehavingError('received block header fails PoW test')
317                 handle_header(header)
318         
319         @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
320         def submit_block_p2p(block):
321             if factory.conn.value is None:
322                 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%32x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, bitcoin_data.hash256(bitcoin_data.block_header_type.pack(block['header'])))
323                 raise deferral.RetrySilentlyException()
324             factory.conn.value.send_block(block=block)
325         
326         @deferral.retry('Error submitting block: (will retry)', 10, 10)
327         @defer.inlineCallbacks
328         def submit_block_rpc(block, ignore_failure):
329             success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
330             success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
331             if (not success and success_expected and not ignore_failure) or (success and not success_expected):
332                 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (success, success_expected)
333         
334         def submit_block(block, ignore_failure):
335             submit_block_p2p(block)
336             submit_block_rpc(block, ignore_failure)
337         
338         @tracker.verified.added.watch
339         def _(share):
340             if share.pow_hash <= share.header['bits'].target:
341                 submit_block(share.as_block(tracker), ignore_failure=True)
342                 print
343                 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
344                 print
345                 def spread():
346                     if (get_height_rel_highest(share.header['previous_block']) > -5 or
347                         bitcoind_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]):
348                         broadcast_share(share.hash)
349                 spread()
350                 reactor.callLater(5, spread) # so get_height_rel_highest can update
351         
352         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
353         
354         @defer.inlineCallbacks
355         def parse(x):
356             if ':' in x:
357                 ip, port = x.split(':')
358                 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
359             else:
360                 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
361         
362         addrs = {}
363         if os.path.exists(os.path.join(datadir_path, 'addrs')):
364             try:
365                 with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
366                     addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
367             except:
368                 print >>sys.stderr, 'error parsing addrs'
369         for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
370             try:
371                 addr = yield addr_df
372                 if addr not in addrs:
373                     addrs[addr] = (0, time.time(), time.time())
374             except:
375                 log.err()
376         
377         connect_addrs = set()
378         for addr_df in map(parse, args.p2pool_nodes):
379             try:
380                 connect_addrs.add((yield addr_df))
381             except:
382                 log.err()
383         
384         p2p_node = Node(
385             best_share_hash_func=lambda: best_share_var.value,
386             port=args.p2pool_port,
387             net=net,
388             addr_store=addrs,
389             connect_addrs=connect_addrs,
390             max_incoming_conns=args.p2pool_conns,
391         )
392         p2p_node.start()
393         
394         def save_addrs():
395             with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
396                 f.write(json.dumps(p2p_node.addr_store.items()))
397         task.LoopingCall(save_addrs).start(60)
398         
399         @best_block_header.changed.watch
400         def _(header):
401             for peer in p2p_node.peers.itervalues():
402                 peer.send_bestblock(header=header)
403         
404         @defer.inlineCallbacks
405         def broadcast_share(share_hash):
406             shares = []
407             for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
408                 if share.hash in shared_share_hashes:
409                     break
410                 shared_share_hashes.add(share.hash)
411                 shares.append(share)
412             
413             for peer in list(p2p_node.peers.itervalues()):
414                 yield peer.sendShares([share for share in shares if share.peer is not peer])
415         
416         # send share when the chain changes to their chain
417         best_share_var.changed.watch(broadcast_share)
418         
419         def save_shares():
420             for share in tracker.get_chain(best_share_var.value, min(tracker.get_height(best_share_var.value), 2*net.CHAIN_LENGTH)):
421                 ss.add_share(share)
422                 if share.hash in tracker.verified.items:
423                     ss.add_verified_hash(share.hash)
424         task.LoopingCall(save_shares).start(60)
425         
426         print '    ...success!'
427         print
428         
429         if args.upnp:
430             @defer.inlineCallbacks
431             def upnp_thread():
432                 while True:
433                     try:
434                         is_lan, lan_ip = yield ipdiscover.get_local_ip()
435                         if is_lan:
436                             pm = yield portmapper.get_port_mapper()
437                             yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
438                     except defer.TimeoutError:
439                         pass
440                     except:
441                         if p2pool.DEBUG:
442                             log.err(None, 'UPnP error:')
443                     yield deferral.sleep(random.expovariate(1/120))
444             upnp_thread()
445         
446         # start listening for workers with a JSON-RPC server
447         
448         print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
449         
450         get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, best_share_var.value, bitcoind_work.value['bits'].target, bitcoind_work.value['subsidy'], net)
451         
452         wb = work.WorkerBridge(my_pubkey_hash, net, args.donation_percentage, bitcoind_work, best_block_header, merged_urls, best_share_var, tracker, my_share_hashes, my_doa_share_hashes, args.worker_fee, p2p_node, submit_block, set_best_share, broadcast_share, block_height_var)
453         web_root = web.get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net, wb.get_stale_counts, my_pubkey_hash, wb.local_rate_monitor, args.worker_fee, p2p_node, wb.my_share_hashes, wb.pseudoshare_received, wb.share_received, best_share_var, bitcoind_warning_var)
454         worker_interface.WorkerInterface(wb).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
455         
456         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
457         
458         with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
459             pass
460         
461         print '    ...success!'
462         print
463         
464         
465         # done!
466         print 'Started successfully!'
467         print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
468         if args.donation_percentage > 0.51:
469             print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
470         elif args.donation_percentage < 0.49:
471             print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
472         else:
473             print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
474             print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
475         print
476         
477         
478         if hasattr(signal, 'SIGALRM'):
479             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
480                 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
481             ))
482             signal.siginterrupt(signal.SIGALRM, False)
483             task.LoopingCall(signal.alarm, 30).start(1)
484         
485         if args.irc_announce:
486             from twisted.words.protocols import irc
487             class IRCClient(irc.IRCClient):
488                 nickname = 'p2pool%02i' % (random.randrange(100),)
489                 channel = net.ANNOUNCE_CHANNEL
490                 def lineReceived(self, line):
491                     if p2pool.DEBUG:
492                         print repr(line)
493                     irc.IRCClient.lineReceived(self, line)
494                 def signedOn(self):
495                     irc.IRCClient.signedOn(self)
496                     self.factory.resetDelay()
497                     self.join(self.channel)
498                     @defer.inlineCallbacks
499                     def new_share(share):
500                         if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
501                             yield deferral.sleep(random.expovariate(1/60))
502                             message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
503                             if message not in self.recent_messages:
504                                 self.say(self.channel, message)
505                                 self._remember_message(message)
506                     self.watch_id = tracker.verified.added.watch(new_share)
507                     self.recent_messages = []
508                 def _remember_message(self, message):
509                     self.recent_messages.append(message)
510                     while len(self.recent_messages) > 100:
511                         self.recent_messages.pop(0)
512                 def privmsg(self, user, channel, message):
513                     if channel == self.channel:
514                         self._remember_message(message)
515                 def connectionLost(self, reason):
516                     tracker.verified.added.unwatch(self.watch_id)
517                     print 'IRC connection lost:', reason.getErrorMessage()
518             class IRCClientFactory(protocol.ReconnectingClientFactory):
519                 protocol = IRCClient
520             reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
521         
522         @defer.inlineCallbacks
523         def status_thread():
524             last_str = None
525             last_time = 0
526             while True:
527                 yield deferral.sleep(3)
528                 try:
529                     height = tracker.get_height(best_share_var.value)
530                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
531                         height,
532                         len(tracker.verified.items),
533                         len(tracker.items),
534                         len(p2p_node.peers),
535                         sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
536                     ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
537                     
538                     datums, dt = wb.local_rate_monitor.get_datums_in_last()
539                     my_att_s = sum(datum['work']/dt for datum in datums)
540                     this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
541                         math.format(int(my_att_s)),
542                         math.format_dt(dt),
543                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
544                         math.format_dt(2**256 / tracker.items[best_share_var.value].max_target / my_att_s) if my_att_s and best_share_var.value else '???',
545                     )
546                     
547                     if height > 2:
548                         (stale_orphan_shares, stale_doa_shares), shares, _ = wb.get_stale_counts()
549                         stale_prop = p2pool_data.get_average_stale_prop(tracker, best_share_var.value, min(60*60//net.SHARE_PERIOD, height))
550                         real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, best_share_var.value, min(height - 1, 60*60//net.SHARE_PERIOD)) / (1 - stale_prop)
551                         
552                         this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
553                             shares, stale_orphan_shares, stale_doa_shares,
554                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
555                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
556                             get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
557                         )
558                         this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
559                             math.format(int(real_att_s)),
560                             100*stale_prop,
561                             math.format_dt(2**256 / bitcoind_work.value['bits'].target / real_att_s),
562                         )
563                         
564                         for warning in p2pool_data.get_warnings(tracker, best_share_var.value, net, bitcoind_warning_var.value, bitcoind_work.value):
565                             print >>sys.stderr, '#'*40
566                             print >>sys.stderr, '>>> Warning: ' + warning
567                             print >>sys.stderr, '#'*40
568                     
569                     if this_str != last_str or time.time() > last_time + 15:
570                         print this_str
571                         last_str = this_str
572                         last_time = time.time()
573                 except:
574                     log.err()
575         status_thread()
576     except:
577         reactor.stop()
578         log.err(None, 'Fatal error:')
579
580 def run():
581     realnets = dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
582     
583     parser = fixargparse.FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
584     parser.add_argument('--version', action='version', version=p2pool.__version__)
585     parser.add_argument('--net',
586         help='use specified network (default: bitcoin)',
587         action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
588     parser.add_argument('--testnet',
589         help='''use the network's testnet''',
590         action='store_const', const=True, default=False, dest='testnet')
591     parser.add_argument('--debug',
592         help='enable debugging mode',
593         action='store_const', const=True, default=False, dest='debug')
594     parser.add_argument('-a', '--address',
595         help='generate payouts to this address (default: <address requested from bitcoind>)',
596         type=str, action='store', default=None, dest='address')
597     parser.add_argument('--datadir',
598         help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
599         type=str, action='store', default=None, dest='datadir')
600     parser.add_argument('--logfile',
601         help='''log to this file (default: data/<NET>/log)''',
602         type=str, action='store', default=None, dest='logfile')
603     parser.add_argument('--merged',
604         help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
605         type=str, action='append', default=[], dest='merged_urls')
606     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
607         help='donate this percentage of work towards the development of p2pool (default: 0.5)',
608         type=float, action='store', default=0.5, dest='donation_percentage')
609     parser.add_argument('--iocp',
610         help='use Windows IOCP API in order to avoid errors due to large number of sockets being open',
611         action='store_true', default=False, dest='iocp')
612     parser.add_argument('--irc-announce',
613         help='announce any blocks found on irc://irc.freenode.net/#p2pool',
614         action='store_true', default=False, dest='irc_announce')
615     parser.add_argument('--no-bugreport',
616         help='disable submitting caught exceptions to the author',
617         action='store_true', default=False, dest='no_bugreport')
618     
619     p2pool_group = parser.add_argument_group('p2pool interface')
620     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
621         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
622         type=int, action='store', default=None, dest='p2pool_port')
623     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
624         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
625         type=str, action='append', default=[], dest='p2pool_nodes')
626     parser.add_argument('--disable-upnp',
627         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
628         action='store_false', default=True, dest='upnp')
629     p2pool_group.add_argument('--max-conns', metavar='CONNS',
630         help='maximum incoming connections (default: 40)',
631         type=int, action='store', default=40, dest='p2pool_conns')
632     
633     worker_group = parser.add_argument_group('worker interface')
634     worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
635         help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
636         type=str, action='store', default=None, dest='worker_endpoint')
637     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
638         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
639         type=float, action='store', default=0, dest='worker_fee')
640     
641     bitcoind_group = parser.add_argument_group('bitcoind interface')
642     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
643         help='connect to this address (default: 127.0.0.1)',
644         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
645     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
646         help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
647         type=int, action='store', default=None, dest='bitcoind_rpc_port')
648     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
649         help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
650         type=int, action='store', default=None, dest='bitcoind_p2p_port')
651     
652     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
653         help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
654         type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
655     
656     args = parser.parse_args()
657     
658     if args.debug:
659         p2pool.DEBUG = True
660         defer.setDebugging(True)
661     
662     net_name = args.net_name + ('_testnet' if args.testnet else '')
663     net = networks.nets[net_name]
664     
665     datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
666     if not os.path.exists(datadir_path):
667         os.makedirs(datadir_path)
668     
669     if len(args.bitcoind_rpc_userpass) > 2:
670         parser.error('a maximum of two arguments are allowed')
671     args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
672     
673     if args.bitcoind_rpc_password is None:
674         conf_path = net.PARENT.CONF_FILE_FUNC()
675         if not os.path.exists(conf_path):
676             parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
677                 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
678                 '''\r\n'''
679                 '''server=1\r\n'''
680                 '''rpcpassword=%x\r\n'''
681                 '''\r\n'''
682                 '''Keep that password secret! After creating the file, restart Bitcoin.''' % (conf_path, random.randrange(2**128)))
683         with open(conf_path, 'rb') as f:
684             cp = ConfigParser.RawConfigParser()
685             cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
686             for conf_name, var_name, var_type in [
687                 ('rpcuser', 'bitcoind_rpc_username', str),
688                 ('rpcpassword', 'bitcoind_rpc_password', str),
689                 ('rpcport', 'bitcoind_rpc_port', int),
690                 ('port', 'bitcoind_p2p_port', int),
691             ]:
692                 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
693                     setattr(args, var_name, var_type(cp.get('x', conf_name)))
694         if args.bitcoind_rpc_password is None:
695             parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
696     
697     if args.bitcoind_rpc_username is None:
698         args.bitcoind_rpc_username = ''
699     
700     if args.bitcoind_rpc_port is None:
701         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
702     
703     if args.bitcoind_p2p_port is None:
704         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
705     
706     if args.p2pool_port is None:
707         args.p2pool_port = net.P2P_PORT
708     
709     if args.worker_endpoint is None:
710         worker_endpoint = '', net.WORKER_PORT
711     elif ':' not in args.worker_endpoint:
712         worker_endpoint = '', int(args.worker_endpoint)
713     else:
714         addr, port = args.worker_endpoint.rsplit(':', 1)
715         worker_endpoint = addr, int(port)
716     
717     if args.address is not None:
718         try:
719             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
720         except Exception, e:
721             parser.error('error parsing address: ' + repr(e))
722     else:
723         args.pubkey_hash = None
724     
725     def separate_url(url):
726         s = urlparse.urlsplit(url)
727         if '@' not in s.netloc:
728             parser.error('merged url netloc must contain an "@"')
729         userpass, new_netloc = s.netloc.rsplit('@', 1)
730         return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
731     merged_urls = map(separate_url, args.merged_urls)
732     
733     if args.logfile is None:
734         args.logfile = os.path.join(datadir_path, 'log')
735     
736     logfile = logging.LogFile(args.logfile)
737     pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
738     sys.stdout = logging.AbortPipe(pipe)
739     sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
740     if hasattr(signal, "SIGUSR1"):
741         def sigusr1(signum, frame):
742             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
743             logfile.reopen()
744             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
745         signal.signal(signal.SIGUSR1, sigusr1)
746     task.LoopingCall(logfile.reopen).start(5)
747     
748     class ErrorReporter(object):
749         def __init__(self):
750             self.last_sent = None
751         
752         def emit(self, eventDict):
753             if not eventDict["isError"]:
754                 return
755             
756             if self.last_sent is not None and time.time() < self.last_sent + 5:
757                 return
758             self.last_sent = time.time()
759             
760             if 'failure' in eventDict:
761                 text = ((eventDict.get('why') or 'Unhandled Error')
762                     + '\n' + eventDict['failure'].getTraceback())
763             else:
764                 text = " ".join([str(m) for m in eventDict["message"]]) + "\n"
765             
766             from twisted.web import client
767             client.getPage(
768                 url='http://u.forre.st/p2pool_error.cgi',
769                 method='POST',
770                 postdata=p2pool.__version__ + ' ' + net.NAME + '\n' + text,
771                 timeout=15,
772             ).addBoth(lambda x: None)
773     if not args.no_bugreport:
774         log.addObserver(ErrorReporter().emit)
775     
776     reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)
777     reactor.run()