fixed block hash format specifier for when block submittal is attempted with no bitco...
[p2pool.git] / p2pool / main.py
1 from __future__ import division
2
3 import ConfigParser
4 import StringIO
5 import base64
6 import json
7 import os
8 import random
9 import sys
10 import time
11 import signal
12 import traceback
13 import urlparse
14
15 if '--iocp' in sys.argv:
16     from twisted.internet import iocpreactor
17     iocpreactor.install()
18 from twisted.internet import defer, reactor, protocol, task
19 from twisted.web import server
20 from twisted.python import log
21 from nattraverso import portmapper, ipdiscover
22
23 import bitcoin.p2p as bitcoin_p2p, bitcoin.data as bitcoin_data
24 from bitcoin import worker_interface, height_tracker
25 from util import fixargparse, jsonrpc, variable, deferral, math, logging
26 from . import p2p, networks, web, work
27 import p2pool, p2pool.data as p2pool_data
28
29 @deferral.retry('Error getting work from bitcoind:', 3)
30 @defer.inlineCallbacks
31 def getwork(bitcoind, use_getblocktemplate=False):
32     def go():
33         if use_getblocktemplate:
34             return bitcoind.rpc_getblocktemplate(dict(mode='template'))
35         else:
36             return bitcoind.rpc_getmemorypool()
37     try:
38         work = yield go()
39     except jsonrpc.Error_for_code(-32601): # Method not found
40         use_getblocktemplate = not use_getblocktemplate
41         try:
42             work = yield go()
43         except jsonrpc.Error_for_code(-32601): # Method not found
44             print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
45             raise deferral.RetrySilentlyException()
46     packed_transactions = [(x['data'] if isinstance(x, dict) else x).decode('hex') for x in work['transactions']]
47     if 'height' not in work:
48         work['height'] = (yield bitcoind.rpc_getblock(work['previousblockhash']))['height'] + 1
49     elif p2pool.DEBUG:
50         assert work['height'] == (yield bitcoind.rpc_getblock(work['previousblockhash']))['height'] + 1
51     defer.returnValue(dict(
52         version=work['version'],
53         previous_block=int(work['previousblockhash'], 16),
54         transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
55         merkle_link=bitcoin_data.calculate_merkle_link([None] + map(bitcoin_data.hash256, packed_transactions), 0),
56         subsidy=work['coinbasevalue'],
57         time=work['time'] if 'time' in work else work['curtime'],
58         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
59         coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
60         height=work['height'],
61         last_update=time.time(),
62         use_getblocktemplate=use_getblocktemplate,
63     ))
64
65 @defer.inlineCallbacks
66 def main(args, net, datadir_path, merged_urls, worker_endpoint):
67     try:
68         print 'p2pool (version %s)' % (p2pool.__version__,)
69         print
70         
71         # connect to bitcoind over JSON-RPC and do initial getmemorypool
72         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
73         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
74         bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
75         @deferral.retry('Error while checking Bitcoin connection:', 1)
76         @defer.inlineCallbacks
77         def check():
78             if not (yield net.PARENT.RPC_CHECK(bitcoind)):
79                 print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
80                 raise deferral.RetrySilentlyException()
81             temp_work = yield getwork(bitcoind)
82             if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version'], temp_work):
83                 print >>sys.stderr, '    Bitcoin version too old! BIP16 support required! Upgrade to 0.6.0rc4 or greater!'
84                 raise deferral.RetrySilentlyException()
85             defer.returnValue(temp_work)
86         temp_work = yield check()
87         
88         block_height_var = variable.Variable(None)
89         @defer.inlineCallbacks
90         def poll_height():
91             block_height_var.set((yield deferral.retry('Error while calling getblockcount:')(bitcoind.rpc_getblockcount)()))
92         yield poll_height()
93         task.LoopingCall(poll_height).start(60*60)
94         
95         bitcoind_warning_var = variable.Variable(None)
96         @defer.inlineCallbacks
97         def poll_warnings():
98             errors = (yield deferral.retry('Error while calling getmininginfo:')(bitcoind.rpc_getmininginfo)())['errors']
99             bitcoind_warning_var.set(errors if errors != '' else None)
100         yield poll_warnings()
101         task.LoopingCall(poll_warnings).start(20*60)
102         
103         print '    ...success!'
104         print '    Current block hash: %x' % (temp_work['previous_block'],)
105         print '    Current block height: %i' % (block_height_var.value,)
106         print
107         
108         # connect to bitcoind over bitcoin-p2p
109         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
110         factory = bitcoin_p2p.ClientFactory(net.PARENT)
111         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
112         yield factory.getProtocol() # waits until handshake is successful
113         print '    ...success!'
114         print
115         
116         print 'Determining payout address...'
117         if args.pubkey_hash is None:
118             address_path = os.path.join(datadir_path, 'cached_payout_address')
119             
120             if os.path.exists(address_path):
121                 with open(address_path, 'rb') as f:
122                     address = f.read().strip('\r\n')
123                 print '    Loaded cached address: %s...' % (address,)
124             else:
125                 address = None
126             
127             if address is not None:
128                 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
129                 if not res['isvalid'] or not res['ismine']:
130                     print '    Cached address is either invalid or not controlled by local bitcoind!'
131                     address = None
132             
133             if address is None:
134                 print '    Getting payout address from bitcoind...'
135                 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
136             
137             with open(address_path, 'wb') as f:
138                 f.write(address)
139             
140             my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
141         else:
142             my_pubkey_hash = args.pubkey_hash
143         print '    ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
144         print
145         
146         my_share_hashes = set()
147         my_doa_share_hashes = set()
148         
149         tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
150         shared_share_hashes = set()
151         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
152         known_verified = set()
153         print "Loading shares..."
154         for i, (mode, contents) in enumerate(ss.get_shares()):
155             if mode == 'share':
156                 if contents.hash in tracker.items:
157                     continue
158                 shared_share_hashes.add(contents.hash)
159                 contents.time_seen = 0
160                 tracker.add(contents)
161                 if len(tracker.items) % 1000 == 0 and tracker.items:
162                     print "    %i" % (len(tracker.items),)
163             elif mode == 'verified_hash':
164                 known_verified.add(contents)
165             else:
166                 raise AssertionError()
167         print "    ...inserting %i verified shares..." % (len(known_verified),)
168         for h in known_verified:
169             if h not in tracker.items:
170                 ss.forget_verified_share(h)
171                 continue
172             tracker.verified.add(tracker.items[h])
173         print "    ...done loading %i shares!" % (len(tracker.items),)
174         print
175         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
176         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
177         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
178         
179         print 'Initializing work...'
180         
181         
182         # BITCOIND WORK
183         
184         bitcoind_work = variable.Variable((yield getwork(bitcoind)))
185         @defer.inlineCallbacks
186         def work_poller():
187             while True:
188                 flag = factory.new_block.get_deferred()
189                 try:
190                     bitcoind_work.set((yield getwork(bitcoind, bitcoind_work.value['use_getblocktemplate'])))
191                 except:
192                     log.err()
193                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
194         work_poller()
195         
196         # PEER WORK
197         
198         best_block_header = variable.Variable(None)
199         def handle_header(new_header):
200             # check that header matches current target
201             if not (net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(new_header)) <= bitcoind_work.value['bits'].target):
202                 return
203             bitcoind_best_block = bitcoind_work.value['previous_block']
204             if (best_block_header.value is None
205                 or (
206                     new_header['previous_block'] == bitcoind_best_block and
207                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)) == bitcoind_best_block
208                 ) # new is child of current and previous is current
209                 or (
210                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(new_header)) == bitcoind_best_block and
211                     best_block_header.value['previous_block'] != bitcoind_best_block
212                 )): # new is current and previous is not a child of current
213                 best_block_header.set(new_header)
214         @defer.inlineCallbacks
215         def poll_header():
216             handle_header((yield factory.conn.value.get_block_header(bitcoind_work.value['previous_block'])))
217         bitcoind_work.changed.watch(lambda _: poll_header())
218         yield deferral.retry('Error while requesting best block header:')(poll_header)()
219         
220         # BEST SHARE
221         
222         get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: bitcoind_work.value['previous_block'], net)
223         
224         best_share_var = variable.Variable(None)
225         desired_var = variable.Variable(None)
226         def set_best_share():
227             best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'])
228             
229             best_share_var.set(best)
230             desired_var.set(desired)
231         bitcoind_work.changed.watch(lambda _: set_best_share())
232         set_best_share()
233         
234         print '    ...success!'
235         print
236         
237         # setup p2p logic and join p2pool network
238         
239         class Node(p2p.Node):
240             def handle_shares(self, shares, peer):
241                 if len(shares) > 5:
242                     print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
243                 
244                 new_count = 0
245                 for share in shares:
246                     if share.hash in tracker.items:
247                         #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
248                         continue
249                     
250                     new_count += 1
251                     
252                     #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
253                     
254                     tracker.add(share)
255                 
256                 if new_count:
257                     set_best_share()
258                 
259                 if len(shares) > 5:
260                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.items), 2*net.CHAIN_LENGTH)
261             
262             @defer.inlineCallbacks
263             def handle_share_hashes(self, hashes, peer):
264                 new_hashes = [x for x in hashes if x not in tracker.items]
265                 if not new_hashes:
266                     return
267                 try:
268                     shares = yield peer.get_shares(
269                         hashes=new_hashes,
270                         parents=0,
271                         stops=[],
272                     )
273                 except:
274                     log.err(None, 'in handle_share_hashes:')
275                 else:
276                     self.handle_shares(shares, peer)
277             
278             def handle_get_shares(self, hashes, parents, stops, peer):
279                 parents = min(parents, 1000//len(hashes))
280                 stops = set(stops)
281                 shares = []
282                 for share_hash in hashes:
283                     for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
284                         if share.hash in stops:
285                             break
286                         shares.append(share)
287                 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
288                 return shares
289             
290             def handle_bestblock(self, header, peer):
291                 if net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header)) > header['bits'].target:
292                     raise p2p.PeerMisbehavingError('received block header fails PoW test')
293                 handle_header(header)
294         
295         @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
296         def submit_block_p2p(block):
297             if factory.conn.value is None:
298                 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, bitcoin_data.hash256(bitcoin_data.block_header_type.pack(block['header'])))
299                 raise deferral.RetrySilentlyException()
300             factory.conn.value.send_block(block=block)
301         
302         @deferral.retry('Error submitting block: (will retry)', 10, 10)
303         @defer.inlineCallbacks
304         def submit_block_rpc(block, ignore_failure):
305             success = yield (bitcoind.rpc_submitblock if bitcoind_work.value['use_getblocktemplate']
306                 else bitcoind.rpc_getmemorypool)(bitcoin_data.block_type.pack(block).encode('hex'))
307             success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
308             if (not success and success_expected and not ignore_failure) or (success and not success_expected):
309                 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (success, success_expected)
310         
311         def submit_block(block, ignore_failure):
312             submit_block_p2p(block)
313             submit_block_rpc(block, ignore_failure)
314         
315         @tracker.verified.added.watch
316         def _(share):
317             if share.pow_hash <= share.header['bits'].target:
318                 submit_block(share.as_block(tracker), ignore_failure=True)
319                 print
320                 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
321                 print
322                 def spread():
323                     if (get_height_rel_highest(share.header['previous_block']) > -5 or
324                         bitcoind_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]):
325                         broadcast_share(share.hash)
326                 spread()
327                 reactor.callLater(5, spread) # so get_height_rel_highest can update
328         
329         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
330         
331         @defer.inlineCallbacks
332         def parse(x):
333             if ':' in x:
334                 ip, port = x.split(':')
335                 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
336             else:
337                 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
338         
339         addrs = {}
340         if os.path.exists(os.path.join(datadir_path, 'addrs')):
341             try:
342                 with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
343                     addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
344             except:
345                 print >>sys.stderr, 'error parsing addrs'
346         for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
347             try:
348                 addr = yield addr_df
349                 if addr not in addrs:
350                     addrs[addr] = (0, time.time(), time.time())
351             except:
352                 log.err()
353         
354         connect_addrs = set()
355         for addr_df in map(parse, args.p2pool_nodes):
356             try:
357                 connect_addrs.add((yield addr_df))
358             except:
359                 log.err()
360         
361         p2p_node = Node(
362             best_share_hash_func=lambda: best_share_var.value,
363             port=args.p2pool_port,
364             net=net,
365             addr_store=addrs,
366             connect_addrs=connect_addrs,
367             max_incoming_conns=args.p2pool_conns,
368         )
369         p2p_node.start()
370         
371         def save_addrs():
372             with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
373                 f.write(json.dumps(p2p_node.addr_store.items()))
374         task.LoopingCall(save_addrs).start(60)
375         
376         @best_block_header.changed.watch
377         def _(header):
378             for peer in p2p_node.peers.itervalues():
379                 peer.send_bestblock(header=header)
380         
381         @defer.inlineCallbacks
382         def broadcast_share(share_hash):
383             shares = []
384             for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
385                 if share.hash in shared_share_hashes:
386                     break
387                 shared_share_hashes.add(share.hash)
388                 shares.append(share)
389             
390             for peer in list(p2p_node.peers.itervalues()):
391                 yield peer.sendShares([share for share in shares if share.peer is not peer])
392         
393         # send share when the chain changes to their chain
394         best_share_var.changed.watch(broadcast_share)
395         
396         def save_shares():
397             for share in tracker.get_chain(best_share_var.value, min(tracker.get_height(best_share_var.value), 2*net.CHAIN_LENGTH)):
398                 ss.add_share(share)
399                 if share.hash in tracker.verified.items:
400                     ss.add_verified_hash(share.hash)
401         task.LoopingCall(save_shares).start(60)
402         
403         @apply
404         @defer.inlineCallbacks
405         def download_shares():
406             while True:
407                 desired = yield desired_var.get_when_satisfies(lambda val: len(val) != 0)
408                 peer2, share_hash = random.choice(desired)
409                 
410                 if len(p2p_node.peers) == 0:
411                     yield deferral.sleep(1)
412                     continue
413                 peer = random.choice(p2p_node.peers.values())
414                 
415                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
416                 try:
417                     shares = yield peer.get_shares(
418                         hashes=[share_hash],
419                         parents=500,
420                         stops=[],
421                     )
422                 except:
423                     log.err(None, 'in download_shares:')
424                 else:
425                     p2p_node.handle_shares(shares, peer)
426         
427         print '    ...success!'
428         print
429         
430         if args.upnp:
431             @defer.inlineCallbacks
432             def upnp_thread():
433                 while True:
434                     try:
435                         is_lan, lan_ip = yield ipdiscover.get_local_ip()
436                         if is_lan:
437                             pm = yield portmapper.get_port_mapper()
438                             yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
439                     except defer.TimeoutError:
440                         pass
441                     except:
442                         if p2pool.DEBUG:
443                             log.err(None, 'UPnP error:')
444                     yield deferral.sleep(random.expovariate(1/120))
445             upnp_thread()
446         
447         # start listening for workers with a JSON-RPC server
448         
449         print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
450         
451         get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, best_share_var.value, bitcoind_work.value['bits'].target, bitcoind_work.value['subsidy'], net)
452         
453         wb = work.WorkerBridge(my_pubkey_hash, net, args.donation_percentage, bitcoind_work, best_block_header, merged_urls, best_share_var, tracker, my_share_hashes, my_doa_share_hashes, args.worker_fee, p2p_node, submit_block, set_best_share, broadcast_share, block_height_var)
454         web_root = web.get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net, wb.get_stale_counts, my_pubkey_hash, wb.local_rate_monitor, args.worker_fee, p2p_node, wb.my_share_hashes, wb.pseudoshare_received, wb.share_received, best_share_var, bitcoind_warning_var)
455         worker_interface.WorkerInterface(wb).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
456         
457         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
458         
459         with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
460             pass
461         
462         print '    ...success!'
463         print
464         
465         
466         # done!
467         print 'Started successfully!'
468         print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
469         if args.donation_percentage > 0.51:
470             print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
471         elif args.donation_percentage < 0.49:
472             print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
473         else:
474             print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
475             print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
476         print
477         
478         
479         if hasattr(signal, 'SIGALRM'):
480             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
481                 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
482             ))
483             signal.siginterrupt(signal.SIGALRM, False)
484             task.LoopingCall(signal.alarm, 30).start(1)
485         
486         if args.irc_announce:
487             from twisted.words.protocols import irc
488             class IRCClient(irc.IRCClient):
489                 nickname = 'p2pool%02i' % (random.randrange(100),)
490                 channel = net.ANNOUNCE_CHANNEL
491                 def lineReceived(self, line):
492                     if p2pool.DEBUG:
493                         print repr(line)
494                     irc.IRCClient.lineReceived(self, line)
495                 def signedOn(self):
496                     irc.IRCClient.signedOn(self)
497                     self.factory.resetDelay()
498                     self.join(self.channel)
499                     @defer.inlineCallbacks
500                     def new_share(share):
501                         if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
502                             yield deferral.sleep(random.expovariate(1/60))
503                             message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
504                             if all('%x' % (share.header_hash,) not in old_message for old_message in self.recent_messages):
505                                 self.say(self.channel, message)
506                                 self._remember_message(message)
507                     self.watch_id = tracker.verified.added.watch(new_share)
508                     self.recent_messages = []
509                 def _remember_message(self, message):
510                     self.recent_messages.append(message)
511                     while len(self.recent_messages) > 100:
512                         self.recent_messages.pop(0)
513                 def privmsg(self, user, channel, message):
514                     if channel == self.channel:
515                         self._remember_message(message)
516                 def connectionLost(self, reason):
517                     tracker.verified.added.unwatch(self.watch_id)
518                     print 'IRC connection lost:', reason.getErrorMessage()
519             class IRCClientFactory(protocol.ReconnectingClientFactory):
520                 protocol = IRCClient
521             reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
522         
523         @defer.inlineCallbacks
524         def status_thread():
525             last_str = None
526             last_time = 0
527             while True:
528                 yield deferral.sleep(3)
529                 try:
530                     height = tracker.get_height(best_share_var.value)
531                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
532                         height,
533                         len(tracker.verified.items),
534                         len(tracker.items),
535                         len(p2p_node.peers),
536                         sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
537                     ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
538                     
539                     datums, dt = wb.local_rate_monitor.get_datums_in_last()
540                     my_att_s = sum(datum['work']/dt for datum in datums)
541                     this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
542                         math.format(int(my_att_s)),
543                         math.format_dt(dt),
544                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
545                         math.format_dt(2**256 / tracker.items[best_share_var.value].max_target / my_att_s) if my_att_s and best_share_var.value else '???',
546                     )
547                     
548                     if height > 2:
549                         (stale_orphan_shares, stale_doa_shares), shares, _ = wb.get_stale_counts()
550                         stale_prop = p2pool_data.get_average_stale_prop(tracker, best_share_var.value, min(60*60//net.SHARE_PERIOD, height))
551                         real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, best_share_var.value, min(height - 1, 60*60//net.SHARE_PERIOD)) / (1 - stale_prop)
552                         
553                         this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
554                             shares, stale_orphan_shares, stale_doa_shares,
555                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
556                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
557                             get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
558                         )
559                         this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
560                             math.format(int(real_att_s)),
561                             100*stale_prop,
562                             math.format_dt(2**256 / bitcoind_work.value['bits'].target / real_att_s),
563                         )
564                         
565                         for warning in p2pool_data.get_warnings(tracker, best_share_var.value, net, bitcoind_warning_var.value, bitcoind_work.value):
566                             print >>sys.stderr, '#'*40
567                             print >>sys.stderr, '>>> Warning: ' + warning
568                             print >>sys.stderr, '#'*40
569                     
570                     if this_str != last_str or time.time() > last_time + 15:
571                         print this_str
572                         last_str = this_str
573                         last_time = time.time()
574                 except:
575                     log.err()
576         status_thread()
577     except:
578         reactor.stop()
579         log.err(None, 'Fatal error:')
580
581 def run():
582     realnets = dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
583     
584     parser = fixargparse.FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
585     parser.add_argument('--version', action='version', version=p2pool.__version__)
586     parser.add_argument('--net',
587         help='use specified network (default: bitcoin)',
588         action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
589     parser.add_argument('--testnet',
590         help='''use the network's testnet''',
591         action='store_const', const=True, default=False, dest='testnet')
592     parser.add_argument('--debug',
593         help='enable debugging mode',
594         action='store_const', const=True, default=False, dest='debug')
595     parser.add_argument('-a', '--address',
596         help='generate payouts to this address (default: <address requested from bitcoind>)',
597         type=str, action='store', default=None, dest='address')
598     parser.add_argument('--datadir',
599         help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
600         type=str, action='store', default=None, dest='datadir')
601     parser.add_argument('--logfile',
602         help='''log to this file (default: data/<NET>/log)''',
603         type=str, action='store', default=None, dest='logfile')
604     parser.add_argument('--merged',
605         help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
606         type=str, action='append', default=[], dest='merged_urls')
607     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
608         help='donate this percentage of work towards the development of p2pool (default: 0.5)',
609         type=float, action='store', default=0.5, dest='donation_percentage')
610     parser.add_argument('--iocp',
611         help='use Windows IOCP API in order to avoid errors due to large number of sockets being open',
612         action='store_true', default=False, dest='iocp')
613     parser.add_argument('--irc-announce',
614         help='announce any blocks found on irc://irc.freenode.net/#p2pool',
615         action='store_true', default=False, dest='irc_announce')
616     parser.add_argument('--no-bugreport',
617         help='disable submitting caught exceptions to the author',
618         action='store_true', default=False, dest='no_bugreport')
619     
620     p2pool_group = parser.add_argument_group('p2pool interface')
621     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
622         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
623         type=int, action='store', default=None, dest='p2pool_port')
624     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
625         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
626         type=str, action='append', default=[], dest='p2pool_nodes')
627     parser.add_argument('--disable-upnp',
628         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
629         action='store_false', default=True, dest='upnp')
630     p2pool_group.add_argument('--max-conns', metavar='CONNS',
631         help='maximum incoming connections (default: 40)',
632         type=int, action='store', default=40, dest='p2pool_conns')
633     
634     worker_group = parser.add_argument_group('worker interface')
635     worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
636         help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
637         type=str, action='store', default=None, dest='worker_endpoint')
638     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
639         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
640         type=float, action='store', default=0, dest='worker_fee')
641     
642     bitcoind_group = parser.add_argument_group('bitcoind interface')
643     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
644         help='connect to this address (default: 127.0.0.1)',
645         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
646     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
647         help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
648         type=int, action='store', default=None, dest='bitcoind_rpc_port')
649     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
650         help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
651         type=int, action='store', default=None, dest='bitcoind_p2p_port')
652     
653     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
654         help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
655         type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
656     
657     args = parser.parse_args()
658     
659     if args.debug:
660         p2pool.DEBUG = True
661         defer.setDebugging(True)
662     
663     net_name = args.net_name + ('_testnet' if args.testnet else '')
664     net = networks.nets[net_name]
665     
666     datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
667     if not os.path.exists(datadir_path):
668         os.makedirs(datadir_path)
669     
670     if len(args.bitcoind_rpc_userpass) > 2:
671         parser.error('a maximum of two arguments are allowed')
672     args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
673     
674     if args.bitcoind_rpc_password is None:
675         conf_path = net.PARENT.CONF_FILE_FUNC()
676         if not os.path.exists(conf_path):
677             parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
678                 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
679                 '''\r\n'''
680                 '''server=1\r\n'''
681                 '''rpcpassword=%x\r\n'''
682                 '''\r\n'''
683                 '''Keep that password secret! After creating the file, restart Bitcoin.''' % (conf_path, random.randrange(2**128)))
684         with open(conf_path, 'rb') as f:
685             cp = ConfigParser.RawConfigParser()
686             cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
687             for conf_name, var_name, var_type in [
688                 ('rpcuser', 'bitcoind_rpc_username', str),
689                 ('rpcpassword', 'bitcoind_rpc_password', str),
690                 ('rpcport', 'bitcoind_rpc_port', int),
691                 ('port', 'bitcoind_p2p_port', int),
692             ]:
693                 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
694                     setattr(args, var_name, var_type(cp.get('x', conf_name)))
695         if args.bitcoind_rpc_password is None:
696             parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
697     
698     if args.bitcoind_rpc_username is None:
699         args.bitcoind_rpc_username = ''
700     
701     if args.bitcoind_rpc_port is None:
702         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
703     
704     if args.bitcoind_p2p_port is None:
705         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
706     
707     if args.p2pool_port is None:
708         args.p2pool_port = net.P2P_PORT
709     
710     if args.worker_endpoint is None:
711         worker_endpoint = '', net.WORKER_PORT
712     elif ':' not in args.worker_endpoint:
713         worker_endpoint = '', int(args.worker_endpoint)
714     else:
715         addr, port = args.worker_endpoint.rsplit(':', 1)
716         worker_endpoint = addr, int(port)
717     
718     if args.address is not None:
719         try:
720             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
721         except Exception, e:
722             parser.error('error parsing address: ' + repr(e))
723     else:
724         args.pubkey_hash = None
725     
726     def separate_url(url):
727         s = urlparse.urlsplit(url)
728         if '@' not in s.netloc:
729             parser.error('merged url netloc must contain an "@"')
730         userpass, new_netloc = s.netloc.rsplit('@', 1)
731         return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
732     merged_urls = map(separate_url, args.merged_urls)
733     
734     if args.logfile is None:
735         args.logfile = os.path.join(datadir_path, 'log')
736     
737     logfile = logging.LogFile(args.logfile)
738     pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
739     sys.stdout = logging.AbortPipe(pipe)
740     sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
741     if hasattr(signal, "SIGUSR1"):
742         def sigusr1(signum, frame):
743             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
744             logfile.reopen()
745             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
746         signal.signal(signal.SIGUSR1, sigusr1)
747     task.LoopingCall(logfile.reopen).start(5)
748     
749     class ErrorReporter(object):
750         def __init__(self):
751             self.last_sent = None
752         
753         def emit(self, eventDict):
754             if not eventDict["isError"]:
755                 return
756             
757             if self.last_sent is not None and time.time() < self.last_sent + 5:
758                 return
759             self.last_sent = time.time()
760             
761             if 'failure' in eventDict:
762                 text = ((eventDict.get('why') or 'Unhandled Error')
763                     + '\n' + eventDict['failure'].getTraceback())
764             else:
765                 text = " ".join([str(m) for m in eventDict["message"]]) + "\n"
766             
767             from twisted.web import client
768             client.getPage(
769                 url='http://u.forre.st/p2pool_error.cgi',
770                 method='POST',
771                 postdata=p2pool.__version__ + ' ' + net.NAME + '\n' + text,
772                 timeout=15,
773             ).addBoth(lambda x: None)
774     if not args.no_bugreport:
775         log.addObserver(ErrorReporter().emit)
776     
777     reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)
778     reactor.run()