b89903ab73a6ef4c1c1357cf961976f543eb9fe6
[p2pool.git] / p2pool / main.py
1 from __future__ import division
2
3 import ConfigParser
4 import StringIO
5 import base64
6 import json
7 import os
8 import random
9 import sys
10 import time
11 import signal
12 import traceback
13 import urlparse
14
15 if '--iocp' in sys.argv:
16     from twisted.internet import iocpreactor
17     iocpreactor.install()
18 from twisted.internet import defer, reactor, protocol, task
19 from twisted.web import server
20 from twisted.python import log
21 from nattraverso import portmapper, ipdiscover
22
23 import bitcoin.p2p as bitcoin_p2p, bitcoin.data as bitcoin_data
24 from bitcoin import worker_interface, height_tracker
25 from util import fixargparse, jsonrpc, variable, deferral, math, logging
26 from . import p2p, networks, web, work
27 import p2pool, p2pool.data as p2pool_data
28
29 @deferral.retry('Error getting work from bitcoind:', 3)
30 @defer.inlineCallbacks
31 def getwork(bitcoind, use_getblocktemplate=False):
32     def go():
33         if use_getblocktemplate:
34             return bitcoind.rpc_getblocktemplate(dict(mode='template'))
35         else:
36             return bitcoind.rpc_getmemorypool()
37     try:
38         work = yield go()
39     except jsonrpc.Error_for_code(-32601): # Method not found
40         use_getblocktemplate = not use_getblocktemplate
41         try:
42             work = yield go()
43         except jsonrpc.Error_for_code(-32601): # Method not found
44             print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
45             raise deferral.RetrySilentlyException()
46     packed_transactions = [(x['data'] if isinstance(x, dict) else x).decode('hex') for x in work['transactions']]
47     if 'height' not in work:
48         work['height'] = (yield bitcoind.rpc_getblock(work['previousblockhash']))['height'] + 1
49     elif p2pool.DEBUG:
50         assert work['height'] == (yield bitcoind.rpc_getblock(work['previousblockhash']))['height'] + 1
51     defer.returnValue(dict(
52         version=work['version'],
53         previous_block=int(work['previousblockhash'], 16),
54         transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
55         merkle_link=bitcoin_data.calculate_merkle_link([None] + map(bitcoin_data.hash256, packed_transactions), 0),
56         subsidy=work['coinbasevalue'],
57         time=work['time'] if 'time' in work else work['curtime'],
58         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
59         coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
60         height=work['height'],
61         last_update=time.time(),
62         use_getblocktemplate=use_getblocktemplate,
63     ))
64
65 @defer.inlineCallbacks
66 def main(args, net, datadir_path, merged_urls, worker_endpoint):
67     try:
68         print 'p2pool (version %s)' % (p2pool.__version__,)
69         print
70         
71         traffic_happened = variable.Event()
72         
73         # connect to bitcoind over JSON-RPC and do initial getmemorypool
74         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
75         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
76         bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
77         @deferral.retry('Error while checking Bitcoin connection:', 1)
78         @defer.inlineCallbacks
79         def check():
80             if not (yield net.PARENT.RPC_CHECK(bitcoind)):
81                 print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
82                 raise deferral.RetrySilentlyException()
83             if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version']):
84                 print >>sys.stderr, '    Bitcoin version too old! Upgrade to 0.6.4 or newer!'
85                 raise deferral.RetrySilentlyException()
86         yield check()
87         temp_work = yield getwork(bitcoind)
88         
89         block_height_var = variable.Variable(None)
90         @defer.inlineCallbacks
91         def poll_height():
92             block_height_var.set((yield deferral.retry('Error while calling getblockcount:')(bitcoind.rpc_getblockcount)()))
93         yield poll_height()
94         task.LoopingCall(poll_height).start(60*60)
95         
96         bitcoind_warning_var = variable.Variable(None)
97         @defer.inlineCallbacks
98         def poll_warnings():
99             errors = (yield deferral.retry('Error while calling getmininginfo:')(bitcoind.rpc_getmininginfo)())['errors']
100             bitcoind_warning_var.set(errors if errors != '' else None)
101         yield poll_warnings()
102         task.LoopingCall(poll_warnings).start(20*60)
103         
104         print '    ...success!'
105         print '    Current block hash: %x' % (temp_work['previous_block'],)
106         print '    Current block height: %i' % (block_height_var.value,)
107         print
108         
109         # connect to bitcoind over bitcoin-p2p
110         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
111         factory = bitcoin_p2p.ClientFactory(net.PARENT)
112         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
113         yield factory.getProtocol() # waits until handshake is successful
114         print '    ...success!'
115         print
116         
117         print 'Determining payout address...'
118         if args.pubkey_hash is None:
119             address_path = os.path.join(datadir_path, 'cached_payout_address')
120             
121             if os.path.exists(address_path):
122                 with open(address_path, 'rb') as f:
123                     address = f.read().strip('\r\n')
124                 print '    Loaded cached address: %s...' % (address,)
125             else:
126                 address = None
127             
128             if address is not None:
129                 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
130                 if not res['isvalid'] or not res['ismine']:
131                     print '    Cached address is either invalid or not controlled by local bitcoind!'
132                     address = None
133             
134             if address is None:
135                 print '    Getting payout address from bitcoind...'
136                 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
137             
138             with open(address_path, 'wb') as f:
139                 f.write(address)
140             
141             my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
142         else:
143             my_pubkey_hash = args.pubkey_hash
144         print '    ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
145         print
146         
147         my_share_hashes = set()
148         my_doa_share_hashes = set()
149         
150         tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
151         shared_share_hashes = set()
152         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
153         known_verified = set()
154         print "Loading shares..."
155         for i, (mode, contents) in enumerate(ss.get_shares()):
156             if mode == 'share':
157                 if contents.hash in tracker.items:
158                     continue
159                 shared_share_hashes.add(contents.hash)
160                 contents.time_seen = 0
161                 tracker.add(contents)
162                 if len(tracker.items) % 1000 == 0 and tracker.items:
163                     print "    %i" % (len(tracker.items),)
164             elif mode == 'verified_hash':
165                 known_verified.add(contents)
166             else:
167                 raise AssertionError()
168         print "    ...inserting %i verified shares..." % (len(known_verified),)
169         for h in known_verified:
170             if h not in tracker.items:
171                 ss.forget_verified_share(h)
172                 continue
173             tracker.verified.add(tracker.items[h])
174         print "    ...done loading %i shares!" % (len(tracker.items),)
175         print
176         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
177         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
178         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
179         
180         print 'Initializing work...'
181         
182         
183         # BITCOIND WORK
184         
185         bitcoind_work = variable.Variable((yield getwork(bitcoind)))
186         @defer.inlineCallbacks
187         def work_poller():
188             while True:
189                 flag = factory.new_block.get_deferred()
190                 try:
191                     bitcoind_work.set((yield getwork(bitcoind, bitcoind_work.value['use_getblocktemplate'])))
192                 except:
193                     log.err()
194                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
195         work_poller()
196         
197         # PEER WORK
198         
199         best_block_header = variable.Variable(None)
200         def handle_header(new_header):
201             # check that header matches current target
202             if not (net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(new_header)) <= bitcoind_work.value['bits'].target):
203                 return
204             bitcoind_best_block = bitcoind_work.value['previous_block']
205             if (best_block_header.value is None
206                 or (
207                     new_header['previous_block'] == bitcoind_best_block and
208                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)) == bitcoind_best_block
209                 ) # new is child of current and previous is current
210                 or (
211                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(new_header)) == bitcoind_best_block and
212                     best_block_header.value['previous_block'] != bitcoind_best_block
213                 )): # new is current and previous is not a child of current
214                 best_block_header.set(new_header)
215         @defer.inlineCallbacks
216         def poll_header():
217             handle_header((yield factory.conn.value.get_block_header(bitcoind_work.value['previous_block'])))
218         bitcoind_work.changed.watch(lambda _: poll_header())
219         yield deferral.retry('Error while requesting best block header:')(poll_header)()
220         
221         # BEST SHARE
222         
223         get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: bitcoind_work.value['previous_block'], net)
224         
225         best_share_var = variable.Variable(None)
226         desired_var = variable.Variable(None)
227         def set_best_share():
228             best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'])
229             
230             best_share_var.set(best)
231             desired_var.set(desired)
232         bitcoind_work.changed.watch(lambda _: set_best_share())
233         set_best_share()
234         
235         print '    ...success!'
236         print
237         
238         # setup p2p logic and join p2pool network
239         
240         class Node(p2p.Node):
241             def handle_shares(self, shares, peer):
242                 if len(shares) > 5:
243                     print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
244                 
245                 new_count = 0
246                 for share in shares:
247                     if share.hash in tracker.items:
248                         #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
249                         continue
250                     
251                     new_count += 1
252                     
253                     #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
254                     
255                     tracker.add(share)
256                 
257                 if new_count:
258                     set_best_share()
259                 
260                 if len(shares) > 5:
261                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.items), 2*net.CHAIN_LENGTH)
262             
263             @defer.inlineCallbacks
264             def handle_share_hashes(self, hashes, peer):
265                 new_hashes = [x for x in hashes if x not in tracker.items]
266                 if not new_hashes:
267                     return
268                 try:
269                     shares = yield peer.get_shares(
270                         hashes=new_hashes,
271                         parents=0,
272                         stops=[],
273                     )
274                 except:
275                     log.err(None, 'in handle_share_hashes:')
276                 else:
277                     self.handle_shares(shares, peer)
278             
279             def handle_get_shares(self, hashes, parents, stops, peer):
280                 parents = min(parents, 1000//len(hashes))
281                 stops = set(stops)
282                 shares = []
283                 for share_hash in hashes:
284                     for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
285                         if share.hash in stops:
286                             break
287                         shares.append(share)
288                 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
289                 return shares
290             
291             def handle_bestblock(self, header, peer):
292                 if net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header)) > header['bits'].target:
293                     raise p2p.PeerMisbehavingError('received block header fails PoW test')
294                 handle_header(header)
295         
296         @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
297         def submit_block_p2p(block):
298             if factory.conn.value is None:
299                 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, bitcoin_data.hash256(bitcoin_data.block_header_type.pack(block['header'])))
300                 raise deferral.RetrySilentlyException()
301             factory.conn.value.send_block(block=block)
302         
303         @deferral.retry('Error submitting block: (will retry)', 10, 10)
304         @defer.inlineCallbacks
305         def submit_block_rpc(block, ignore_failure):
306             if bitcoind_work.value['use_getblocktemplate']:
307                 result = yield bitcoind.rpc_submitblock(bitcoin_data.block_type.pack(block).encode('hex'))
308                 success = result is None
309             else:
310                 result = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
311                 success = result
312             success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
313             if (not success and success_expected and not ignore_failure) or (success and not success_expected):
314                 print >>sys.stderr, 'Block submittal result: %s (%r) Expected: %s' % (success, result, success_expected)
315         
316         def submit_block(block, ignore_failure):
317             submit_block_p2p(block)
318             submit_block_rpc(block, ignore_failure)
319         
320         @tracker.verified.added.watch
321         def _(share):
322             if share.pow_hash <= share.header['bits'].target:
323                 submit_block(share.as_block(tracker), ignore_failure=True)
324                 print
325                 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
326                 print
327                 def spread():
328                     if (get_height_rel_highest(share.header['previous_block']) > -5 or
329                         bitcoind_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]):
330                         broadcast_share(share.hash)
331                 spread()
332                 reactor.callLater(5, spread) # so get_height_rel_highest can update
333         
334         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
335         
336         @defer.inlineCallbacks
337         def parse(x):
338             if ':' in x:
339                 ip, port = x.split(':')
340                 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
341             else:
342                 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
343         
344         addrs = {}
345         if os.path.exists(os.path.join(datadir_path, 'addrs')):
346             try:
347                 with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
348                     addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
349             except:
350                 print >>sys.stderr, 'error parsing addrs'
351         for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
352             try:
353                 addr = yield addr_df
354                 if addr not in addrs:
355                     addrs[addr] = (0, time.time(), time.time())
356             except:
357                 log.err()
358         
359         connect_addrs = set()
360         for addr_df in map(parse, args.p2pool_nodes):
361             try:
362                 connect_addrs.add((yield addr_df))
363             except:
364                 log.err()
365         
366         p2p_node = Node(
367             best_share_hash_func=lambda: best_share_var.value,
368             port=args.p2pool_port,
369             net=net,
370             addr_store=addrs,
371             connect_addrs=connect_addrs,
372             max_incoming_conns=args.p2pool_conns,
373             traffic_happened=traffic_happened,
374         )
375         p2p_node.start()
376         
377         def save_addrs():
378             with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
379                 f.write(json.dumps(p2p_node.addr_store.items()))
380         task.LoopingCall(save_addrs).start(60)
381         
382         @best_block_header.changed.watch
383         def _(header):
384             for peer in p2p_node.peers.itervalues():
385                 peer.send_bestblock(header=header)
386         
387         @defer.inlineCallbacks
388         def broadcast_share(share_hash):
389             shares = []
390             for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
391                 if share.hash in shared_share_hashes:
392                     break
393                 shared_share_hashes.add(share.hash)
394                 shares.append(share)
395             
396             for peer in list(p2p_node.peers.itervalues()):
397                 yield peer.sendShares([share for share in shares if share.peer is not peer])
398         
399         # send share when the chain changes to their chain
400         best_share_var.changed.watch(broadcast_share)
401         
402         def save_shares():
403             for share in tracker.get_chain(best_share_var.value, min(tracker.get_height(best_share_var.value), 2*net.CHAIN_LENGTH)):
404                 ss.add_share(share)
405                 if share.hash in tracker.verified.items:
406                     ss.add_verified_hash(share.hash)
407         task.LoopingCall(save_shares).start(60)
408         
409         @apply
410         @defer.inlineCallbacks
411         def download_shares():
412             while True:
413                 desired = yield desired_var.get_when_satisfies(lambda val: len(val) != 0)
414                 peer2, share_hash = random.choice(desired)
415                 
416                 if len(p2p_node.peers) == 0:
417                     yield deferral.sleep(1)
418                     continue
419                 peer = random.choice(p2p_node.peers.values())
420                 
421                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
422                 try:
423                     shares = yield peer.get_shares(
424                         hashes=[share_hash],
425                         parents=500,
426                         stops=[],
427                     )
428                 except:
429                     log.err(None, 'in download_shares:')
430                     continue
431                 
432                 if not shares:
433                     yield deferral.sleep(1) # sleep so we don't keep rerequesting the same share nobody has
434                     continue
435                 p2p_node.handle_shares(shares, peer)
436         
437         print '    ...success!'
438         print
439         
440         if args.upnp:
441             @defer.inlineCallbacks
442             def upnp_thread():
443                 while True:
444                     try:
445                         is_lan, lan_ip = yield ipdiscover.get_local_ip()
446                         if is_lan:
447                             pm = yield portmapper.get_port_mapper()
448                             yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
449                     except defer.TimeoutError:
450                         pass
451                     except:
452                         if p2pool.DEBUG:
453                             log.err(None, 'UPnP error:')
454                     yield deferral.sleep(random.expovariate(1/120))
455             upnp_thread()
456         
457         # start listening for workers with a JSON-RPC server
458         
459         print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
460         
461         get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, best_share_var.value, bitcoind_work.value['bits'].target, bitcoind_work.value['subsidy'], net)
462         
463         wb = work.WorkerBridge(my_pubkey_hash, net, args.donation_percentage, bitcoind_work, best_block_header, merged_urls, best_share_var, tracker, my_share_hashes, my_doa_share_hashes, args.worker_fee, p2p_node, submit_block, set_best_share, broadcast_share, block_height_var)
464         web_root = web.get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net, wb.get_stale_counts, my_pubkey_hash, wb.local_rate_monitor, args.worker_fee, p2p_node, wb.my_share_hashes, wb.pseudoshare_received, wb.share_received, best_share_var, bitcoind_warning_var, traffic_happened)
465         worker_interface.WorkerInterface(wb).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
466         
467         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
468         
469         with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
470             pass
471         
472         print '    ...success!'
473         print
474         
475         
476         # done!
477         print 'Started successfully!'
478         print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
479         if args.donation_percentage > 0.51:
480             print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
481         elif args.donation_percentage < 0.49:
482             print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
483         else:
484             print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
485             print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
486         print
487         
488         
489         if hasattr(signal, 'SIGALRM'):
490             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
491                 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
492             ))
493             signal.siginterrupt(signal.SIGALRM, False)
494             task.LoopingCall(signal.alarm, 30).start(1)
495         
496         if args.irc_announce:
497             from twisted.words.protocols import irc
498             class IRCClient(irc.IRCClient):
499                 nickname = 'p2pool%02i' % (random.randrange(100),)
500                 channel = net.ANNOUNCE_CHANNEL
501                 def lineReceived(self, line):
502                     if p2pool.DEBUG:
503                         print repr(line)
504                     irc.IRCClient.lineReceived(self, line)
505                 def signedOn(self):
506                     irc.IRCClient.signedOn(self)
507                     self.factory.resetDelay()
508                     self.join(self.channel)
509                     @defer.inlineCallbacks
510                     def new_share(share):
511                         if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
512                             yield deferral.sleep(random.expovariate(1/60))
513                             message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
514                             if all('%x' % (share.header_hash,) not in old_message for old_message in self.recent_messages):
515                                 self.say(self.channel, message)
516                                 self._remember_message(message)
517                     self.watch_id = tracker.verified.added.watch(new_share)
518                     self.recent_messages = []
519                 def _remember_message(self, message):
520                     self.recent_messages.append(message)
521                     while len(self.recent_messages) > 100:
522                         self.recent_messages.pop(0)
523                 def privmsg(self, user, channel, message):
524                     if channel == self.channel:
525                         self._remember_message(message)
526                 def connectionLost(self, reason):
527                     tracker.verified.added.unwatch(self.watch_id)
528                     print 'IRC connection lost:', reason.getErrorMessage()
529             class IRCClientFactory(protocol.ReconnectingClientFactory):
530                 protocol = IRCClient
531             reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
532         
533         @defer.inlineCallbacks
534         def status_thread():
535             last_str = None
536             last_time = 0
537             while True:
538                 yield deferral.sleep(3)
539                 try:
540                     height = tracker.get_height(best_share_var.value)
541                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
542                         height,
543                         len(tracker.verified.items),
544                         len(tracker.items),
545                         len(p2p_node.peers),
546                         sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
547                     ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
548                     
549                     datums, dt = wb.local_rate_monitor.get_datums_in_last()
550                     my_att_s = sum(datum['work']/dt for datum in datums)
551                     this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
552                         math.format(int(my_att_s)),
553                         math.format_dt(dt),
554                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
555                         math.format_dt(2**256 / tracker.items[best_share_var.value].max_target / my_att_s) if my_att_s and best_share_var.value else '???',
556                     )
557                     
558                     if height > 2:
559                         (stale_orphan_shares, stale_doa_shares), shares, _ = wb.get_stale_counts()
560                         stale_prop = p2pool_data.get_average_stale_prop(tracker, best_share_var.value, min(60*60//net.SHARE_PERIOD, height))
561                         real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, best_share_var.value, min(height - 1, 60*60//net.SHARE_PERIOD)) / (1 - stale_prop)
562                         
563                         this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
564                             shares, stale_orphan_shares, stale_doa_shares,
565                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
566                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
567                             get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
568                         )
569                         this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
570                             math.format(int(real_att_s)),
571                             100*stale_prop,
572                             math.format_dt(2**256 / bitcoind_work.value['bits'].target / real_att_s),
573                         )
574                         
575                         for warning in p2pool_data.get_warnings(tracker, best_share_var.value, net, bitcoind_warning_var.value, bitcoind_work.value):
576                             print >>sys.stderr, '#'*40
577                             print >>sys.stderr, '>>> Warning: ' + warning
578                             print >>sys.stderr, '#'*40
579                     
580                     if this_str != last_str or time.time() > last_time + 15:
581                         print this_str
582                         last_str = this_str
583                         last_time = time.time()
584                 except:
585                     log.err()
586         status_thread()
587     except:
588         reactor.stop()
589         log.err(None, 'Fatal error:')
590
591 def run():
592     realnets = dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
593     
594     parser = fixargparse.FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
595     parser.add_argument('--version', action='version', version=p2pool.__version__)
596     parser.add_argument('--net',
597         help='use specified network (default: bitcoin)',
598         action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
599     parser.add_argument('--testnet',
600         help='''use the network's testnet''',
601         action='store_const', const=True, default=False, dest='testnet')
602     parser.add_argument('--debug',
603         help='enable debugging mode',
604         action='store_const', const=True, default=False, dest='debug')
605     parser.add_argument('-a', '--address',
606         help='generate payouts to this address (default: <address requested from bitcoind>)',
607         type=str, action='store', default=None, dest='address')
608     parser.add_argument('--datadir',
609         help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
610         type=str, action='store', default=None, dest='datadir')
611     parser.add_argument('--logfile',
612         help='''log to this file (default: data/<NET>/log)''',
613         type=str, action='store', default=None, dest='logfile')
614     parser.add_argument('--merged',
615         help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
616         type=str, action='append', default=[], dest='merged_urls')
617     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
618         help='donate this percentage of work towards the development of p2pool (default: 0.5)',
619         type=float, action='store', default=0.5, dest='donation_percentage')
620     parser.add_argument('--iocp',
621         help='use Windows IOCP API in order to avoid errors due to large number of sockets being open',
622         action='store_true', default=False, dest='iocp')
623     parser.add_argument('--irc-announce',
624         help='announce any blocks found on irc://irc.freenode.net/#p2pool',
625         action='store_true', default=False, dest='irc_announce')
626     parser.add_argument('--no-bugreport',
627         help='disable submitting caught exceptions to the author',
628         action='store_true', default=False, dest='no_bugreport')
629     
630     p2pool_group = parser.add_argument_group('p2pool interface')
631     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
632         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
633         type=int, action='store', default=None, dest='p2pool_port')
634     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
635         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
636         type=str, action='append', default=[], dest='p2pool_nodes')
637     parser.add_argument('--disable-upnp',
638         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
639         action='store_false', default=True, dest='upnp')
640     p2pool_group.add_argument('--max-conns', metavar='CONNS',
641         help='maximum incoming connections (default: 40)',
642         type=int, action='store', default=40, dest='p2pool_conns')
643     
644     worker_group = parser.add_argument_group('worker interface')
645     worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
646         help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
647         type=str, action='store', default=None, dest='worker_endpoint')
648     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
649         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
650         type=float, action='store', default=0, dest='worker_fee')
651     
652     bitcoind_group = parser.add_argument_group('bitcoind interface')
653     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
654         help='connect to this address (default: 127.0.0.1)',
655         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
656     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
657         help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
658         type=int, action='store', default=None, dest='bitcoind_rpc_port')
659     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
660         help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
661         type=int, action='store', default=None, dest='bitcoind_p2p_port')
662     
663     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
664         help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
665         type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
666     
667     args = parser.parse_args()
668     
669     if args.debug:
670         p2pool.DEBUG = True
671         defer.setDebugging(True)
672     
673     net_name = args.net_name + ('_testnet' if args.testnet else '')
674     net = networks.nets[net_name]
675     
676     datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
677     if not os.path.exists(datadir_path):
678         os.makedirs(datadir_path)
679     
680     if len(args.bitcoind_rpc_userpass) > 2:
681         parser.error('a maximum of two arguments are allowed')
682     args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
683     
684     if args.bitcoind_rpc_password is None:
685         conf_path = net.PARENT.CONF_FILE_FUNC()
686         if not os.path.exists(conf_path):
687             parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
688                 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
689                 '''\r\n'''
690                 '''server=1\r\n'''
691                 '''rpcpassword=%x\r\n'''
692                 '''\r\n'''
693                 '''Keep that password secret! After creating the file, restart Bitcoin.''' % (conf_path, random.randrange(2**128)))
694         with open(conf_path, 'rb') as f:
695             cp = ConfigParser.RawConfigParser()
696             cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
697             for conf_name, var_name, var_type in [
698                 ('rpcuser', 'bitcoind_rpc_username', str),
699                 ('rpcpassword', 'bitcoind_rpc_password', str),
700                 ('rpcport', 'bitcoind_rpc_port', int),
701                 ('port', 'bitcoind_p2p_port', int),
702             ]:
703                 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
704                     setattr(args, var_name, var_type(cp.get('x', conf_name)))
705         if args.bitcoind_rpc_password is None:
706             parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
707     
708     if args.bitcoind_rpc_username is None:
709         args.bitcoind_rpc_username = ''
710     
711     if args.bitcoind_rpc_port is None:
712         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
713     
714     if args.bitcoind_p2p_port is None:
715         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
716     
717     if args.p2pool_port is None:
718         args.p2pool_port = net.P2P_PORT
719     
720     if args.worker_endpoint is None:
721         worker_endpoint = '', net.WORKER_PORT
722     elif ':' not in args.worker_endpoint:
723         worker_endpoint = '', int(args.worker_endpoint)
724     else:
725         addr, port = args.worker_endpoint.rsplit(':', 1)
726         worker_endpoint = addr, int(port)
727     
728     if args.address is not None:
729         try:
730             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
731         except Exception, e:
732             parser.error('error parsing address: ' + repr(e))
733     else:
734         args.pubkey_hash = None
735     
736     def separate_url(url):
737         s = urlparse.urlsplit(url)
738         if '@' not in s.netloc:
739             parser.error('merged url netloc must contain an "@"')
740         userpass, new_netloc = s.netloc.rsplit('@', 1)
741         return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
742     merged_urls = map(separate_url, args.merged_urls)
743     
744     if args.logfile is None:
745         args.logfile = os.path.join(datadir_path, 'log')
746     
747     logfile = logging.LogFile(args.logfile)
748     pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
749     sys.stdout = logging.AbortPipe(pipe)
750     sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
751     if hasattr(signal, "SIGUSR1"):
752         def sigusr1(signum, frame):
753             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
754             logfile.reopen()
755             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
756         signal.signal(signal.SIGUSR1, sigusr1)
757     task.LoopingCall(logfile.reopen).start(5)
758     
759     class ErrorReporter(object):
760         def __init__(self):
761             self.last_sent = None
762         
763         def emit(self, eventDict):
764             if not eventDict["isError"]:
765                 return
766             
767             if self.last_sent is not None and time.time() < self.last_sent + 5:
768                 return
769             self.last_sent = time.time()
770             
771             if 'failure' in eventDict:
772                 text = ((eventDict.get('why') or 'Unhandled Error')
773                     + '\n' + eventDict['failure'].getTraceback())
774             else:
775                 text = " ".join([str(m) for m in eventDict["message"]]) + "\n"
776             
777             from twisted.web import client
778             client.getPage(
779                 url='http://u.forre.st/p2pool_error.cgi',
780                 method='POST',
781                 postdata=p2pool.__version__ + ' ' + net.NAME + '\n' + text,
782                 timeout=15,
783             ).addBoth(lambda x: None)
784     if not args.no_bugreport:
785         log.addObserver(ErrorReporter().emit)
786     
787     reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)
788     reactor.run()