7c819b7ef3f3781de83fcfa25973b3d9ff42b8d1
[p2pool.git] / p2pool / main.py
1 from __future__ import division
2
3 import ConfigParser
4 import StringIO
5 import base64
6 import json
7 import os
8 import random
9 import sys
10 import time
11 import signal
12 import traceback
13 import urlparse
14
15 if '--iocp' in sys.argv:
16     from twisted.internet import iocpreactor
17     iocpreactor.install()
18 from twisted.internet import defer, reactor, protocol, task
19 from twisted.web import server
20 from twisted.python import log
21 from nattraverso import portmapper, ipdiscover
22
23 import bitcoin.p2p as bitcoin_p2p, bitcoin.data as bitcoin_data
24 from bitcoin import worker_interface, height_tracker
25 from util import fixargparse, jsonrpc, variable, deferral, math, logging
26 from . import p2p, networks, web, work
27 import p2pool, p2pool.data as p2pool_data
28
29 @deferral.retry('Error getting work from bitcoind:', 3)
30 @defer.inlineCallbacks
31 def getwork(bitcoind, use_getblocktemplate=False):
32     def go():
33         if use_getblocktemplate:
34             return bitcoind.rpc_getblocktemplate(dict(mode='template'))
35         else:
36             return bitcoind.rpc_getmemorypool()
37     try:
38         work = yield go()
39     except jsonrpc.Error_for_code(-32601): # Method not found
40         use_getblocktemplate = not use_getblocktemplate
41         try:
42             work = yield go()
43         except jsonrpc.Error_for_code(-32601): # Method not found
44             print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
45             raise deferral.RetrySilentlyException()
46     packed_transactions = [(x['data'] if isinstance(x, dict) else x).decode('hex') for x in work['transactions']]
47     if 'height' not in work:
48         work['height'] = (yield bitcoind.rpc_getblock(work['previousblockhash']))['height'] + 1
49     elif p2pool.DEBUG:
50         assert work['height'] == (yield bitcoind.rpc_getblock(work['previousblockhash']))['height'] + 1
51     defer.returnValue(dict(
52         version=work['version'],
53         previous_block=int(work['previousblockhash'], 16),
54         transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
55         merkle_link=bitcoin_data.calculate_merkle_link([None] + map(bitcoin_data.hash256, packed_transactions), 0),
56         subsidy=work['coinbasevalue'],
57         time=work['time'] if 'time' in work else work['curtime'],
58         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
59         coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
60         height=work['height'],
61         last_update=time.time(),
62         use_getblocktemplate=use_getblocktemplate,
63     ))
64
65 @defer.inlineCallbacks
66 def main(args, net, datadir_path, merged_urls, worker_endpoint):
67     try:
68         print 'p2pool (version %s)' % (p2pool.__version__,)
69         print
70         
71         # connect to bitcoind over JSON-RPC and do initial getmemorypool
72         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
73         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
74         bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
75         @deferral.retry('Error while checking Bitcoin connection:', 1)
76         @defer.inlineCallbacks
77         def check():
78             if not (yield net.PARENT.RPC_CHECK(bitcoind)):
79                 print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
80                 raise deferral.RetrySilentlyException()
81             if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version']):
82                 print >>sys.stderr, '    Bitcoin version too old! Upgrade to 0.6.4 or newer!'
83                 raise deferral.RetrySilentlyException()
84         yield check()
85         temp_work = yield getwork(bitcoind)
86         
87         block_height_var = variable.Variable(None)
88         @defer.inlineCallbacks
89         def poll_height():
90             block_height_var.set((yield deferral.retry('Error while calling getblockcount:')(bitcoind.rpc_getblockcount)()))
91         yield poll_height()
92         task.LoopingCall(poll_height).start(60*60)
93         
94         bitcoind_warning_var = variable.Variable(None)
95         @defer.inlineCallbacks
96         def poll_warnings():
97             errors = (yield deferral.retry('Error while calling getmininginfo:')(bitcoind.rpc_getmininginfo)())['errors']
98             bitcoind_warning_var.set(errors if errors != '' else None)
99         yield poll_warnings()
100         task.LoopingCall(poll_warnings).start(20*60)
101         
102         print '    ...success!'
103         print '    Current block hash: %x' % (temp_work['previous_block'],)
104         print '    Current block height: %i' % (block_height_var.value,)
105         print
106         
107         # connect to bitcoind over bitcoin-p2p
108         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
109         factory = bitcoin_p2p.ClientFactory(net.PARENT)
110         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
111         yield factory.getProtocol() # waits until handshake is successful
112         print '    ...success!'
113         print
114         
115         print 'Determining payout address...'
116         if args.pubkey_hash is None:
117             address_path = os.path.join(datadir_path, 'cached_payout_address')
118             
119             if os.path.exists(address_path):
120                 with open(address_path, 'rb') as f:
121                     address = f.read().strip('\r\n')
122                 print '    Loaded cached address: %s...' % (address,)
123             else:
124                 address = None
125             
126             if address is not None:
127                 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
128                 if not res['isvalid'] or not res['ismine']:
129                     print '    Cached address is either invalid or not controlled by local bitcoind!'
130                     address = None
131             
132             if address is None:
133                 print '    Getting payout address from bitcoind...'
134                 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
135             
136             with open(address_path, 'wb') as f:
137                 f.write(address)
138             
139             my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
140         else:
141             my_pubkey_hash = args.pubkey_hash
142         print '    ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
143         print
144         
145         my_share_hashes = set()
146         my_doa_share_hashes = set()
147         
148         tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
149         shared_share_hashes = set()
150         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
151         known_verified = set()
152         print "Loading shares..."
153         for i, (mode, contents) in enumerate(ss.get_shares()):
154             if mode == 'share':
155                 if contents.hash in tracker.items:
156                     continue
157                 shared_share_hashes.add(contents.hash)
158                 contents.time_seen = 0
159                 tracker.add(contents)
160                 if len(tracker.items) % 1000 == 0 and tracker.items:
161                     print "    %i" % (len(tracker.items),)
162             elif mode == 'verified_hash':
163                 known_verified.add(contents)
164             else:
165                 raise AssertionError()
166         print "    ...inserting %i verified shares..." % (len(known_verified),)
167         for h in known_verified:
168             if h not in tracker.items:
169                 ss.forget_verified_share(h)
170                 continue
171             tracker.verified.add(tracker.items[h])
172         print "    ...done loading %i shares!" % (len(tracker.items),)
173         print
174         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
175         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
176         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
177         
178         print 'Initializing work...'
179         
180         
181         # BITCOIND WORK
182         
183         bitcoind_work = variable.Variable((yield getwork(bitcoind)))
184         @defer.inlineCallbacks
185         def work_poller():
186             while True:
187                 flag = factory.new_block.get_deferred()
188                 try:
189                     bitcoind_work.set((yield getwork(bitcoind, bitcoind_work.value['use_getblocktemplate'])))
190                 except:
191                     log.err()
192                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
193         work_poller()
194         
195         # PEER WORK
196         
197         best_block_header = variable.Variable(None)
198         def handle_header(new_header):
199             # check that header matches current target
200             if not (net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(new_header)) <= bitcoind_work.value['bits'].target):
201                 return
202             bitcoind_best_block = bitcoind_work.value['previous_block']
203             if (best_block_header.value is None
204                 or (
205                     new_header['previous_block'] == bitcoind_best_block and
206                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)) == bitcoind_best_block
207                 ) # new is child of current and previous is current
208                 or (
209                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(new_header)) == bitcoind_best_block and
210                     best_block_header.value['previous_block'] != bitcoind_best_block
211                 )): # new is current and previous is not a child of current
212                 best_block_header.set(new_header)
213         @defer.inlineCallbacks
214         def poll_header():
215             handle_header((yield factory.conn.value.get_block_header(bitcoind_work.value['previous_block'])))
216         bitcoind_work.changed.watch(lambda _: poll_header())
217         yield deferral.retry('Error while requesting best block header:')(poll_header)()
218         
219         # BEST SHARE
220         
221         get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: bitcoind_work.value['previous_block'], net)
222         
223         best_share_var = variable.Variable(None)
224         desired_var = variable.Variable(None)
225         def set_best_share():
226             best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'])
227             
228             best_share_var.set(best)
229             desired_var.set(desired)
230         bitcoind_work.changed.watch(lambda _: set_best_share())
231         set_best_share()
232         
233         print '    ...success!'
234         print
235         
236         # setup p2p logic and join p2pool network
237         
238         class Node(p2p.Node):
239             def handle_shares(self, shares, peer):
240                 if len(shares) > 5:
241                     print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
242                 
243                 new_count = 0
244                 for share in shares:
245                     if share.hash in tracker.items:
246                         #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
247                         continue
248                     
249                     new_count += 1
250                     
251                     #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
252                     
253                     tracker.add(share)
254                 
255                 if new_count:
256                     set_best_share()
257                 
258                 if len(shares) > 5:
259                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.items), 2*net.CHAIN_LENGTH)
260             
261             @defer.inlineCallbacks
262             def handle_share_hashes(self, hashes, peer):
263                 new_hashes = [x for x in hashes if x not in tracker.items]
264                 if not new_hashes:
265                     return
266                 try:
267                     shares = yield peer.get_shares(
268                         hashes=new_hashes,
269                         parents=0,
270                         stops=[],
271                     )
272                 except:
273                     log.err(None, 'in handle_share_hashes:')
274                 else:
275                     self.handle_shares(shares, peer)
276             
277             def handle_get_shares(self, hashes, parents, stops, peer):
278                 parents = min(parents, 1000//len(hashes))
279                 stops = set(stops)
280                 shares = []
281                 for share_hash in hashes:
282                     for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
283                         if share.hash in stops:
284                             break
285                         shares.append(share)
286                 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
287                 return shares
288             
289             def handle_bestblock(self, header, peer):
290                 if net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header)) > header['bits'].target:
291                     raise p2p.PeerMisbehavingError('received block header fails PoW test')
292                 handle_header(header)
293         
294         @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
295         def submit_block_p2p(block):
296             if factory.conn.value is None:
297                 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, bitcoin_data.hash256(bitcoin_data.block_header_type.pack(block['header'])))
298                 raise deferral.RetrySilentlyException()
299             factory.conn.value.send_block(block=block)
300         
301         @deferral.retry('Error submitting block: (will retry)', 10, 10)
302         @defer.inlineCallbacks
303         def submit_block_rpc(block, ignore_failure):
304             if bitcoind_work.value['use_getblocktemplate']:
305                 result = yield bitcoind.rpc_submitblock(bitcoin_data.block_type.pack(block).encode('hex'))
306                 success = result is None
307             else:
308                 result = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
309                 success = result
310             success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
311             if (not success and success_expected and not ignore_failure) or (success and not success_expected):
312                 print >>sys.stderr, 'Block submittal result: %s (%r) Expected: %s' % (success, result, success_expected)
313         
314         def submit_block(block, ignore_failure):
315             submit_block_p2p(block)
316             submit_block_rpc(block, ignore_failure)
317         
318         @tracker.verified.added.watch
319         def _(share):
320             if share.pow_hash <= share.header['bits'].target:
321                 submit_block(share.as_block(tracker), ignore_failure=True)
322                 print
323                 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
324                 print
325                 def spread():
326                     if (get_height_rel_highest(share.header['previous_block']) > -5 or
327                         bitcoind_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]):
328                         broadcast_share(share.hash)
329                 spread()
330                 reactor.callLater(5, spread) # so get_height_rel_highest can update
331         
332         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
333         
334         @defer.inlineCallbacks
335         def parse(x):
336             if ':' in x:
337                 ip, port = x.split(':')
338                 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
339             else:
340                 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
341         
342         addrs = {}
343         if os.path.exists(os.path.join(datadir_path, 'addrs')):
344             try:
345                 with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
346                     addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
347             except:
348                 print >>sys.stderr, 'error parsing addrs'
349         for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
350             try:
351                 addr = yield addr_df
352                 if addr not in addrs:
353                     addrs[addr] = (0, time.time(), time.time())
354             except:
355                 log.err()
356         
357         connect_addrs = set()
358         for addr_df in map(parse, args.p2pool_nodes):
359             try:
360                 connect_addrs.add((yield addr_df))
361             except:
362                 log.err()
363         
364         p2p_node = Node(
365             best_share_hash_func=lambda: best_share_var.value,
366             port=args.p2pool_port,
367             net=net,
368             addr_store=addrs,
369             connect_addrs=connect_addrs,
370             max_incoming_conns=args.p2pool_conns,
371         )
372         p2p_node.start()
373         
374         def save_addrs():
375             with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
376                 f.write(json.dumps(p2p_node.addr_store.items()))
377         task.LoopingCall(save_addrs).start(60)
378         
379         @best_block_header.changed.watch
380         def _(header):
381             for peer in p2p_node.peers.itervalues():
382                 peer.send_bestblock(header=header)
383         
384         @defer.inlineCallbacks
385         def broadcast_share(share_hash):
386             shares = []
387             for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
388                 if share.hash in shared_share_hashes:
389                     break
390                 shared_share_hashes.add(share.hash)
391                 shares.append(share)
392             
393             for peer in list(p2p_node.peers.itervalues()):
394                 yield peer.sendShares([share for share in shares if share.peer is not peer])
395         
396         # send share when the chain changes to their chain
397         best_share_var.changed.watch(broadcast_share)
398         
399         def save_shares():
400             for share in tracker.get_chain(best_share_var.value, min(tracker.get_height(best_share_var.value), 2*net.CHAIN_LENGTH)):
401                 ss.add_share(share)
402                 if share.hash in tracker.verified.items:
403                     ss.add_verified_hash(share.hash)
404         task.LoopingCall(save_shares).start(60)
405         
406         @apply
407         @defer.inlineCallbacks
408         def download_shares():
409             while True:
410                 desired = yield desired_var.get_when_satisfies(lambda val: len(val) != 0)
411                 peer2, share_hash = random.choice(desired)
412                 
413                 if len(p2p_node.peers) == 0:
414                     yield deferral.sleep(1)
415                     continue
416                 peer = random.choice(p2p_node.peers.values())
417                 
418                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
419                 try:
420                     shares = yield peer.get_shares(
421                         hashes=[share_hash],
422                         parents=500,
423                         stops=[],
424                     )
425                 except:
426                     log.err(None, 'in download_shares:')
427                 else:
428                     p2p_node.handle_shares(shares, peer)
429         
430         print '    ...success!'
431         print
432         
433         if args.upnp:
434             @defer.inlineCallbacks
435             def upnp_thread():
436                 while True:
437                     try:
438                         is_lan, lan_ip = yield ipdiscover.get_local_ip()
439                         if is_lan:
440                             pm = yield portmapper.get_port_mapper()
441                             yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
442                     except defer.TimeoutError:
443                         pass
444                     except:
445                         if p2pool.DEBUG:
446                             log.err(None, 'UPnP error:')
447                     yield deferral.sleep(random.expovariate(1/120))
448             upnp_thread()
449         
450         # start listening for workers with a JSON-RPC server
451         
452         print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
453         
454         get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, best_share_var.value, bitcoind_work.value['bits'].target, bitcoind_work.value['subsidy'], net)
455         
456         wb = work.WorkerBridge(my_pubkey_hash, net, args.donation_percentage, bitcoind_work, best_block_header, merged_urls, best_share_var, tracker, my_share_hashes, my_doa_share_hashes, args.worker_fee, p2p_node, submit_block, set_best_share, broadcast_share, block_height_var)
457         web_root = web.get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net, wb.get_stale_counts, my_pubkey_hash, wb.local_rate_monitor, args.worker_fee, p2p_node, wb.my_share_hashes, wb.pseudoshare_received, wb.share_received, best_share_var, bitcoind_warning_var)
458         worker_interface.WorkerInterface(wb).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
459         
460         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
461         
462         with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
463             pass
464         
465         print '    ...success!'
466         print
467         
468         
469         # done!
470         print 'Started successfully!'
471         print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
472         if args.donation_percentage > 0.51:
473             print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
474         elif args.donation_percentage < 0.49:
475             print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
476         else:
477             print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
478             print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
479         print
480         
481         
482         if hasattr(signal, 'SIGALRM'):
483             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
484                 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
485             ))
486             signal.siginterrupt(signal.SIGALRM, False)
487             task.LoopingCall(signal.alarm, 30).start(1)
488         
489         if args.irc_announce:
490             from twisted.words.protocols import irc
491             class IRCClient(irc.IRCClient):
492                 nickname = 'p2pool%02i' % (random.randrange(100),)
493                 channel = net.ANNOUNCE_CHANNEL
494                 def lineReceived(self, line):
495                     if p2pool.DEBUG:
496                         print repr(line)
497                     irc.IRCClient.lineReceived(self, line)
498                 def signedOn(self):
499                     irc.IRCClient.signedOn(self)
500                     self.factory.resetDelay()
501                     self.join(self.channel)
502                     @defer.inlineCallbacks
503                     def new_share(share):
504                         if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
505                             yield deferral.sleep(random.expovariate(1/60))
506                             message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
507                             if all('%x' % (share.header_hash,) not in old_message for old_message in self.recent_messages):
508                                 self.say(self.channel, message)
509                                 self._remember_message(message)
510                     self.watch_id = tracker.verified.added.watch(new_share)
511                     self.recent_messages = []
512                 def _remember_message(self, message):
513                     self.recent_messages.append(message)
514                     while len(self.recent_messages) > 100:
515                         self.recent_messages.pop(0)
516                 def privmsg(self, user, channel, message):
517                     if channel == self.channel:
518                         self._remember_message(message)
519                 def connectionLost(self, reason):
520                     tracker.verified.added.unwatch(self.watch_id)
521                     print 'IRC connection lost:', reason.getErrorMessage()
522             class IRCClientFactory(protocol.ReconnectingClientFactory):
523                 protocol = IRCClient
524             reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
525         
526         @defer.inlineCallbacks
527         def status_thread():
528             last_str = None
529             last_time = 0
530             while True:
531                 yield deferral.sleep(3)
532                 try:
533                     height = tracker.get_height(best_share_var.value)
534                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
535                         height,
536                         len(tracker.verified.items),
537                         len(tracker.items),
538                         len(p2p_node.peers),
539                         sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
540                     ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
541                     
542                     datums, dt = wb.local_rate_monitor.get_datums_in_last()
543                     my_att_s = sum(datum['work']/dt for datum in datums)
544                     this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
545                         math.format(int(my_att_s)),
546                         math.format_dt(dt),
547                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
548                         math.format_dt(2**256 / tracker.items[best_share_var.value].max_target / my_att_s) if my_att_s and best_share_var.value else '???',
549                     )
550                     
551                     if height > 2:
552                         (stale_orphan_shares, stale_doa_shares), shares, _ = wb.get_stale_counts()
553                         stale_prop = p2pool_data.get_average_stale_prop(tracker, best_share_var.value, min(60*60//net.SHARE_PERIOD, height))
554                         real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, best_share_var.value, min(height - 1, 60*60//net.SHARE_PERIOD)) / (1 - stale_prop)
555                         
556                         this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
557                             shares, stale_orphan_shares, stale_doa_shares,
558                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
559                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
560                             get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
561                         )
562                         this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
563                             math.format(int(real_att_s)),
564                             100*stale_prop,
565                             math.format_dt(2**256 / bitcoind_work.value['bits'].target / real_att_s),
566                         )
567                         
568                         for warning in p2pool_data.get_warnings(tracker, best_share_var.value, net, bitcoind_warning_var.value, bitcoind_work.value):
569                             print >>sys.stderr, '#'*40
570                             print >>sys.stderr, '>>> Warning: ' + warning
571                             print >>sys.stderr, '#'*40
572                     
573                     if this_str != last_str or time.time() > last_time + 15:
574                         print this_str
575                         last_str = this_str
576                         last_time = time.time()
577                 except:
578                     log.err()
579         status_thread()
580     except:
581         reactor.stop()
582         log.err(None, 'Fatal error:')
583
584 def run():
585     realnets = dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
586     
587     parser = fixargparse.FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
588     parser.add_argument('--version', action='version', version=p2pool.__version__)
589     parser.add_argument('--net',
590         help='use specified network (default: bitcoin)',
591         action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
592     parser.add_argument('--testnet',
593         help='''use the network's testnet''',
594         action='store_const', const=True, default=False, dest='testnet')
595     parser.add_argument('--debug',
596         help='enable debugging mode',
597         action='store_const', const=True, default=False, dest='debug')
598     parser.add_argument('-a', '--address',
599         help='generate payouts to this address (default: <address requested from bitcoind>)',
600         type=str, action='store', default=None, dest='address')
601     parser.add_argument('--datadir',
602         help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
603         type=str, action='store', default=None, dest='datadir')
604     parser.add_argument('--logfile',
605         help='''log to this file (default: data/<NET>/log)''',
606         type=str, action='store', default=None, dest='logfile')
607     parser.add_argument('--merged',
608         help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
609         type=str, action='append', default=[], dest='merged_urls')
610     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
611         help='donate this percentage of work towards the development of p2pool (default: 0.5)',
612         type=float, action='store', default=0.5, dest='donation_percentage')
613     parser.add_argument('--iocp',
614         help='use Windows IOCP API in order to avoid errors due to large number of sockets being open',
615         action='store_true', default=False, dest='iocp')
616     parser.add_argument('--irc-announce',
617         help='announce any blocks found on irc://irc.freenode.net/#p2pool',
618         action='store_true', default=False, dest='irc_announce')
619     parser.add_argument('--no-bugreport',
620         help='disable submitting caught exceptions to the author',
621         action='store_true', default=False, dest='no_bugreport')
622     
623     p2pool_group = parser.add_argument_group('p2pool interface')
624     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
625         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
626         type=int, action='store', default=None, dest='p2pool_port')
627     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
628         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
629         type=str, action='append', default=[], dest='p2pool_nodes')
630     parser.add_argument('--disable-upnp',
631         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
632         action='store_false', default=True, dest='upnp')
633     p2pool_group.add_argument('--max-conns', metavar='CONNS',
634         help='maximum incoming connections (default: 40)',
635         type=int, action='store', default=40, dest='p2pool_conns')
636     
637     worker_group = parser.add_argument_group('worker interface')
638     worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
639         help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
640         type=str, action='store', default=None, dest='worker_endpoint')
641     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
642         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
643         type=float, action='store', default=0, dest='worker_fee')
644     
645     bitcoind_group = parser.add_argument_group('bitcoind interface')
646     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
647         help='connect to this address (default: 127.0.0.1)',
648         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
649     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
650         help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
651         type=int, action='store', default=None, dest='bitcoind_rpc_port')
652     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
653         help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
654         type=int, action='store', default=None, dest='bitcoind_p2p_port')
655     
656     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
657         help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
658         type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
659     
660     args = parser.parse_args()
661     
662     if args.debug:
663         p2pool.DEBUG = True
664         defer.setDebugging(True)
665     
666     net_name = args.net_name + ('_testnet' if args.testnet else '')
667     net = networks.nets[net_name]
668     
669     datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
670     if not os.path.exists(datadir_path):
671         os.makedirs(datadir_path)
672     
673     if len(args.bitcoind_rpc_userpass) > 2:
674         parser.error('a maximum of two arguments are allowed')
675     args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
676     
677     if args.bitcoind_rpc_password is None:
678         conf_path = net.PARENT.CONF_FILE_FUNC()
679         if not os.path.exists(conf_path):
680             parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
681                 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
682                 '''\r\n'''
683                 '''server=1\r\n'''
684                 '''rpcpassword=%x\r\n'''
685                 '''\r\n'''
686                 '''Keep that password secret! After creating the file, restart Bitcoin.''' % (conf_path, random.randrange(2**128)))
687         with open(conf_path, 'rb') as f:
688             cp = ConfigParser.RawConfigParser()
689             cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
690             for conf_name, var_name, var_type in [
691                 ('rpcuser', 'bitcoind_rpc_username', str),
692                 ('rpcpassword', 'bitcoind_rpc_password', str),
693                 ('rpcport', 'bitcoind_rpc_port', int),
694                 ('port', 'bitcoind_p2p_port', int),
695             ]:
696                 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
697                     setattr(args, var_name, var_type(cp.get('x', conf_name)))
698         if args.bitcoind_rpc_password is None:
699             parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
700     
701     if args.bitcoind_rpc_username is None:
702         args.bitcoind_rpc_username = ''
703     
704     if args.bitcoind_rpc_port is None:
705         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
706     
707     if args.bitcoind_p2p_port is None:
708         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
709     
710     if args.p2pool_port is None:
711         args.p2pool_port = net.P2P_PORT
712     
713     if args.worker_endpoint is None:
714         worker_endpoint = '', net.WORKER_PORT
715     elif ':' not in args.worker_endpoint:
716         worker_endpoint = '', int(args.worker_endpoint)
717     else:
718         addr, port = args.worker_endpoint.rsplit(':', 1)
719         worker_endpoint = addr, int(port)
720     
721     if args.address is not None:
722         try:
723             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
724         except Exception, e:
725             parser.error('error parsing address: ' + repr(e))
726     else:
727         args.pubkey_hash = None
728     
729     def separate_url(url):
730         s = urlparse.urlsplit(url)
731         if '@' not in s.netloc:
732             parser.error('merged url netloc must contain an "@"')
733         userpass, new_netloc = s.netloc.rsplit('@', 1)
734         return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
735     merged_urls = map(separate_url, args.merged_urls)
736     
737     if args.logfile is None:
738         args.logfile = os.path.join(datadir_path, 'log')
739     
740     logfile = logging.LogFile(args.logfile)
741     pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
742     sys.stdout = logging.AbortPipe(pipe)
743     sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
744     if hasattr(signal, "SIGUSR1"):
745         def sigusr1(signum, frame):
746             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
747             logfile.reopen()
748             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
749         signal.signal(signal.SIGUSR1, sigusr1)
750     task.LoopingCall(logfile.reopen).start(5)
751     
752     class ErrorReporter(object):
753         def __init__(self):
754             self.last_sent = None
755         
756         def emit(self, eventDict):
757             if not eventDict["isError"]:
758                 return
759             
760             if self.last_sent is not None and time.time() < self.last_sent + 5:
761                 return
762             self.last_sent = time.time()
763             
764             if 'failure' in eventDict:
765                 text = ((eventDict.get('why') or 'Unhandled Error')
766                     + '\n' + eventDict['failure'].getTraceback())
767             else:
768                 text = " ".join([str(m) for m in eventDict["message"]]) + "\n"
769             
770             from twisted.web import client
771             client.getPage(
772                 url='http://u.forre.st/p2pool_error.cgi',
773                 method='POST',
774                 postdata=p2pool.__version__ + ' ' + net.NAME + '\n' + text,
775                 timeout=15,
776             ).addBoth(lambda x: None)
777     if not args.no_bugreport:
778         log.addObserver(ErrorReporter().emit)
779     
780     reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)
781     reactor.run()