Use min(bitcoin block version, 2) as block version instead of fixing it at 2 so BIP34...
[p2pool.git] / p2pool / main.py
1 from __future__ import division
2
3 import ConfigParser
4 import StringIO
5 import base64
6 import json
7 import os
8 import random
9 import sys
10 import time
11 import signal
12 import traceback
13 import urlparse
14
15 if '--iocp' in sys.argv:
16     from twisted.internet import iocpreactor
17     iocpreactor.install()
18 from twisted.internet import defer, reactor, protocol, task
19 from twisted.web import server
20 from twisted.python import log
21 from nattraverso import portmapper, ipdiscover
22
23 import bitcoin.p2p as bitcoin_p2p, bitcoin.data as bitcoin_data
24 from bitcoin import worker_interface, height_tracker
25 from util import expiring_dict, fixargparse, jsonrpc, variable, deferral, math, logging
26 from . import p2p, networks, web, work
27 import p2pool, p2pool.data as p2pool_data
28
29 @deferral.retry('Error getting work from bitcoind:', 3)
30 @defer.inlineCallbacks
31 def getwork(bitcoind):
32     try:
33         work = yield bitcoind.rpc_getmemorypool()
34     except jsonrpc.Error, e:
35         if e.code == -32601: # Method not found
36             print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
37             raise deferral.RetrySilentlyException()
38         raise
39     packed_transactions = [x.decode('hex') for x in work['transactions']]
40     if 'height' not in work:
41         work['height'] = (yield bitcoind.rpc_getblock(work['previousblockhash']))['height'] + 1
42     defer.returnValue(dict(
43         version=work['version'],
44         previous_block=int(work['previousblockhash'], 16),
45         transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
46         merkle_link=bitcoin_data.calculate_merkle_link([None] + map(bitcoin_data.hash256, packed_transactions), 0),
47         subsidy=work['coinbasevalue'],
48         time=work['time'],
49         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
50         coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
51         height=work['height'],
52         clock_offset=time.time() - work['time'],
53         last_update=time.time(),
54     ))
55
56 @defer.inlineCallbacks
57 def main(args, net, datadir_path, merged_urls, worker_endpoint):
58     try:
59         print 'p2pool (version %s)' % (p2pool.__version__,)
60         print
61         
62         # connect to bitcoind over JSON-RPC and do initial getmemorypool
63         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
64         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
65         bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
66         @deferral.retry('Error while checking Bitcoin connection:', 1)
67         @defer.inlineCallbacks
68         def check():
69             if not (yield net.PARENT.RPC_CHECK(bitcoind)):
70                 print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
71                 raise deferral.RetrySilentlyException()
72             temp_work = yield getwork(bitcoind)
73             if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version'], temp_work):
74                 print >>sys.stderr, '    Bitcoin version too old! BIP16 support required! Upgrade to 0.6.0rc4 or greater!'
75                 raise deferral.RetrySilentlyException()
76             defer.returnValue(temp_work)
77         temp_work = yield check()
78         
79         block_height_var = variable.Variable(None)
80         @defer.inlineCallbacks
81         def poll_height():
82             block_height_var.set((yield deferral.retry('Error while calling getblockcount:')(bitcoind.rpc_getblockcount)()))
83         yield poll_height()
84         task.LoopingCall(poll_height).start(60*60)
85         
86         bitcoind_warning_var = variable.Variable(None)
87         @defer.inlineCallbacks
88         def poll_warnings():
89             errors = (yield deferral.retry('Error while calling getmininginfo:')(bitcoind.rpc_getmininginfo)())['errors']
90             bitcoind_warning_var.set(errors if errors != '' else None)
91         yield poll_warnings()
92         task.LoopingCall(poll_warnings).start(20*60)
93         
94         print '    ...success!'
95         print '    Current block hash: %x' % (temp_work['previous_block'],)
96         print '    Current block height: %i' % (block_height_var.value,)
97         print
98         
99         # connect to bitcoind over bitcoin-p2p
100         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
101         factory = bitcoin_p2p.ClientFactory(net.PARENT)
102         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
103         yield factory.getProtocol() # waits until handshake is successful
104         print '    ...success!'
105         print
106         
107         print 'Determining payout address...'
108         if args.pubkey_hash is None:
109             address_path = os.path.join(datadir_path, 'cached_payout_address')
110             
111             if os.path.exists(address_path):
112                 with open(address_path, 'rb') as f:
113                     address = f.read().strip('\r\n')
114                 print '    Loaded cached address: %s...' % (address,)
115             else:
116                 address = None
117             
118             if address is not None:
119                 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
120                 if not res['isvalid'] or not res['ismine']:
121                     print '    Cached address is either invalid or not controlled by local bitcoind!'
122                     address = None
123             
124             if address is None:
125                 print '    Getting payout address from bitcoind...'
126                 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
127             
128             with open(address_path, 'wb') as f:
129                 f.write(address)
130             
131             my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
132         else:
133             my_pubkey_hash = args.pubkey_hash
134         print '    ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
135         print
136         
137         my_share_hashes = set()
138         my_doa_share_hashes = set()
139         
140         tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
141         shared_share_hashes = set()
142         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
143         known_verified = set()
144         print "Loading shares..."
145         for i, (mode, contents) in enumerate(ss.get_shares()):
146             if mode == 'share':
147                 if contents.hash in tracker.items:
148                     continue
149                 shared_share_hashes.add(contents.hash)
150                 contents.time_seen = 0
151                 tracker.add(contents)
152                 if len(tracker.items) % 1000 == 0 and tracker.items:
153                     print "    %i" % (len(tracker.items),)
154             elif mode == 'verified_hash':
155                 known_verified.add(contents)
156             else:
157                 raise AssertionError()
158         print "    ...inserting %i verified shares..." % (len(known_verified),)
159         for h in known_verified:
160             if h not in tracker.items:
161                 ss.forget_verified_share(h)
162                 continue
163             tracker.verified.add(tracker.items[h])
164         print "    ...done loading %i shares!" % (len(tracker.items),)
165         print
166         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
167         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
168         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
169         
170         print 'Initializing work...'
171         
172         
173         # BITCOIND WORK
174         
175         bitcoind_work = variable.Variable((yield getwork(bitcoind)))
176         @defer.inlineCallbacks
177         def work_poller():
178             while True:
179                 flag = factory.new_block.get_deferred()
180                 try:
181                     bitcoind_work.set((yield getwork(bitcoind)))
182                 except:
183                     log.err()
184                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
185         work_poller()
186         
187         # PEER WORK
188         
189         best_block_header = variable.Variable(None)
190         def handle_header(new_header):
191             # check that header matches current target
192             if not (net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(new_header)) <= bitcoind_work.value['bits'].target):
193                 return
194             bitcoind_best_block = bitcoind_work.value['previous_block']
195             if (best_block_header.value is None
196                 or (
197                     new_header['previous_block'] == bitcoind_best_block and
198                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)) == bitcoind_best_block
199                 ) # new is child of current and previous is current
200                 or (
201                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(new_header)) == bitcoind_best_block and
202                     best_block_header.value['previous_block'] != bitcoind_best_block
203                 )): # new is current and previous is not a child of current
204                 best_block_header.set(new_header)
205         @defer.inlineCallbacks
206         def poll_header():
207             handle_header((yield factory.conn.value.get_block_header(bitcoind_work.value['previous_block'])))
208         bitcoind_work.changed.watch(lambda _: poll_header())
209         yield deferral.retry('Error while requesting best block header:')(poll_header)()
210         
211         # BEST SHARE
212         
213         get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: bitcoind_work.value['previous_block'], net)
214         requested = expiring_dict.ExpiringDict(300)
215         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
216         
217         best_share_var = variable.Variable(None)
218         def set_best_share():
219             best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'])
220             
221             best_share_var.set(best)
222             
223             t = time.time()
224             for peer2, share_hash in desired:
225                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
226                     continue
227                 last_request_time, count = requested.get(share_hash, (None, 0))
228                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
229                     continue
230                 potential_peers = set()
231                 for head in tracker.tails[share_hash]:
232                     potential_peers.update(peer_heads.get(head, set()))
233                 potential_peers = [peer for peer in potential_peers if peer.connected2]
234                 if count == 0 and peer2 is not None and peer2.connected2:
235                     peer = peer2
236                 else:
237                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
238                     if peer is None:
239                         continue
240                 
241                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
242                 peer.send_getshares(
243                     hashes=[share_hash],
244                     parents=2000,
245                     stops=list(set(tracker.heads) | set(
246                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
247                     ))[:100],
248                 )
249                 requested[share_hash] = t, count + 1
250         bitcoind_work.changed.watch(lambda _: set_best_share())
251         set_best_share()
252         
253         
254         print '    ...success!'
255         print
256         
257         # setup p2p logic and join p2pool network
258         
259         class Node(p2p.Node):
260             def handle_shares(self, shares, peer):
261                 if len(shares) > 5:
262                     print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
263                 
264                 new_count = 0
265                 for share in shares:
266                     if share.hash in tracker.items:
267                         #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
268                         continue
269                     
270                     new_count += 1
271                     
272                     #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
273                     
274                     tracker.add(share)
275                 
276                 if shares and peer is not None:
277                     peer_heads.setdefault(shares[0].hash, set()).add(peer)
278                 
279                 if new_count:
280                     set_best_share()
281                 
282                 if len(shares) > 5:
283                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.items), 2*net.CHAIN_LENGTH)
284             
285             def handle_share_hashes(self, hashes, peer):
286                 t = time.time()
287                 get_hashes = []
288                 for share_hash in hashes:
289                     if share_hash in tracker.items:
290                         continue
291                     last_request_time, count = requested.get(share_hash, (None, 0))
292                     if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
293                         continue
294                     print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
295                     get_hashes.append(share_hash)
296                     requested[share_hash] = t, count + 1
297                 
298                 if hashes and peer is not None:
299                     peer_heads.setdefault(hashes[0], set()).add(peer)
300                 if get_hashes:
301                     peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
302             
303             def handle_get_shares(self, hashes, parents, stops, peer):
304                 parents = min(parents, 1000//len(hashes))
305                 stops = set(stops)
306                 shares = []
307                 for share_hash in hashes:
308                     for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
309                         if share.hash in stops:
310                             break
311                         shares.append(share)
312                 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
313                 return shares
314             
315             def handle_bestblock(self, header, peer):
316                 if net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header)) > header['bits'].target:
317                     raise p2p.PeerMisbehavingError('received block header fails PoW test')
318                 handle_header(header)
319         
320         @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
321         def submit_block_p2p(block):
322             if factory.conn.value is None:
323                 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%32x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, bitcoin_data.hash256(bitcoin_data.block_header_type.pack(block['header'])))
324                 raise deferral.RetrySilentlyException()
325             factory.conn.value.send_block(block=block)
326         
327         @deferral.retry('Error submitting block: (will retry)', 10, 10)
328         @defer.inlineCallbacks
329         def submit_block_rpc(block, ignore_failure):
330             success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
331             success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
332             if (not success and success_expected and not ignore_failure) or (success and not success_expected):
333                 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (success, success_expected)
334         
335         def submit_block(block, ignore_failure):
336             submit_block_p2p(block)
337             submit_block_rpc(block, ignore_failure)
338         
339         @tracker.verified.added.watch
340         def _(share):
341             if share.pow_hash <= share.header['bits'].target:
342                 submit_block(share.as_block(tracker), ignore_failure=True)
343                 print
344                 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
345                 print
346                 def spread():
347                     if (get_height_rel_highest(share.header['previous_block']) > -5 or
348                         bitcoind_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]):
349                         broadcast_share(share.hash)
350                 spread()
351                 reactor.callLater(5, spread) # so get_height_rel_highest can update
352         
353         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
354         
355         @defer.inlineCallbacks
356         def parse(x):
357             if ':' in x:
358                 ip, port = x.split(':')
359                 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
360             else:
361                 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
362         
363         addrs = {}
364         if os.path.exists(os.path.join(datadir_path, 'addrs')):
365             try:
366                 with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
367                     addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
368             except:
369                 print >>sys.stderr, 'error parsing addrs'
370         for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
371             try:
372                 addr = yield addr_df
373                 if addr not in addrs:
374                     addrs[addr] = (0, time.time(), time.time())
375             except:
376                 log.err()
377         
378         connect_addrs = set()
379         for addr_df in map(parse, args.p2pool_nodes):
380             try:
381                 connect_addrs.add((yield addr_df))
382             except:
383                 log.err()
384         
385         p2p_node = Node(
386             best_share_hash_func=lambda: best_share_var.value,
387             port=args.p2pool_port,
388             net=net,
389             addr_store=addrs,
390             connect_addrs=connect_addrs,
391             max_incoming_conns=args.p2pool_conns,
392         )
393         p2p_node.start()
394         
395         def save_addrs():
396             with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
397                 f.write(json.dumps(p2p_node.addr_store.items()))
398         task.LoopingCall(save_addrs).start(60)
399         
400         @best_block_header.changed.watch
401         def _(header):
402             for peer in p2p_node.peers.itervalues():
403                 peer.send_bestblock(header=header)
404         
405         @defer.inlineCallbacks
406         def broadcast_share(share_hash):
407             shares = []
408             for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
409                 if share.hash in shared_share_hashes:
410                     break
411                 shared_share_hashes.add(share.hash)
412                 shares.append(share)
413             
414             for peer in list(p2p_node.peers.itervalues()):
415                 yield peer.sendShares([share for share in shares if share.peer is not peer])
416         
417         # send share when the chain changes to their chain
418         best_share_var.changed.watch(broadcast_share)
419         
420         def save_shares():
421             for share in tracker.get_chain(best_share_var.value, min(tracker.get_height(best_share_var.value), 2*net.CHAIN_LENGTH)):
422                 ss.add_share(share)
423                 if share.hash in tracker.verified.items:
424                     ss.add_verified_hash(share.hash)
425         task.LoopingCall(save_shares).start(60)
426         
427         print '    ...success!'
428         print
429         
430         if args.upnp:
431             @defer.inlineCallbacks
432             def upnp_thread():
433                 while True:
434                     try:
435                         is_lan, lan_ip = yield ipdiscover.get_local_ip()
436                         if is_lan:
437                             pm = yield portmapper.get_port_mapper()
438                             yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
439                     except defer.TimeoutError:
440                         pass
441                     except:
442                         if p2pool.DEBUG:
443                             log.err(None, 'UPnP error:')
444                     yield deferral.sleep(random.expovariate(1/120))
445             upnp_thread()
446         
447         # start listening for workers with a JSON-RPC server
448         
449         print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
450         
451         get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, best_share_var.value, bitcoind_work.value['bits'].target, bitcoind_work.value['subsidy'], net)
452         
453         wb = work.WorkerBridge(my_pubkey_hash, net, args.donation_percentage, bitcoind_work, best_block_header, merged_urls, best_share_var, tracker, my_share_hashes, my_doa_share_hashes, args.worker_fee, p2p_node, submit_block, set_best_share, broadcast_share, block_height_var)
454         web_root = web.get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net, wb.get_stale_counts, my_pubkey_hash, wb.local_rate_monitor, args.worker_fee, p2p_node, wb.my_share_hashes, wb.pseudoshare_received, wb.share_received, best_share_var, bitcoind_warning_var)
455         worker_interface.WorkerInterface(wb).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
456         
457         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
458         
459         with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
460             pass
461         
462         print '    ...success!'
463         print
464         
465         
466         # done!
467         print 'Started successfully!'
468         print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
469         if args.donation_percentage > 0.51:
470             print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
471         elif args.donation_percentage < 0.49:
472             print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
473         else:
474             print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
475             print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
476         print
477         
478         
479         if hasattr(signal, 'SIGALRM'):
480             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
481                 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
482             ))
483             signal.siginterrupt(signal.SIGALRM, False)
484             task.LoopingCall(signal.alarm, 30).start(1)
485         
486         if args.irc_announce:
487             from twisted.words.protocols import irc
488             class IRCClient(irc.IRCClient):
489                 nickname = 'p2pool%02i' % (random.randrange(100),)
490                 channel = net.ANNOUNCE_CHANNEL
491                 def lineReceived(self, line):
492                     if p2pool.DEBUG:
493                         print repr(line)
494                     irc.IRCClient.lineReceived(self, line)
495                 def signedOn(self):
496                     irc.IRCClient.signedOn(self)
497                     self.factory.resetDelay()
498                     self.join(self.channel)
499                     @defer.inlineCallbacks
500                     def new_share(share):
501                         if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
502                             yield deferral.sleep(random.expovariate(1/60))
503                             message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
504                             if all('%x' % (share.header_hash,) not in old_message for old_message in self.recent_messages):
505                                 self.say(self.channel, message)
506                                 self._remember_message(message)
507                     self.watch_id = tracker.verified.added.watch(new_share)
508                     self.recent_messages = []
509                 def _remember_message(self, message):
510                     self.recent_messages.append(message)
511                     while len(self.recent_messages) > 100:
512                         self.recent_messages.pop(0)
513                 def privmsg(self, user, channel, message):
514                     if channel == self.channel:
515                         self._remember_message(message)
516                 def connectionLost(self, reason):
517                     tracker.verified.added.unwatch(self.watch_id)
518                     print 'IRC connection lost:', reason.getErrorMessage()
519             class IRCClientFactory(protocol.ReconnectingClientFactory):
520                 protocol = IRCClient
521             reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
522         
523         @defer.inlineCallbacks
524         def status_thread():
525             last_str = None
526             last_time = 0
527             while True:
528                 yield deferral.sleep(3)
529                 try:
530                     height = tracker.get_height(best_share_var.value)
531                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
532                         height,
533                         len(tracker.verified.items),
534                         len(tracker.items),
535                         len(p2p_node.peers),
536                         sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
537                     ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
538                     
539                     datums, dt = wb.local_rate_monitor.get_datums_in_last()
540                     my_att_s = sum(datum['work']/dt for datum in datums)
541                     this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
542                         math.format(int(my_att_s)),
543                         math.format_dt(dt),
544                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
545                         math.format_dt(2**256 / tracker.items[best_share_var.value].max_target / my_att_s) if my_att_s and best_share_var.value else '???',
546                     )
547                     
548                     if height > 2:
549                         (stale_orphan_shares, stale_doa_shares), shares, _ = wb.get_stale_counts()
550                         stale_prop = p2pool_data.get_average_stale_prop(tracker, best_share_var.value, min(60*60//net.SHARE_PERIOD, height))
551                         real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, best_share_var.value, min(height - 1, 60*60//net.SHARE_PERIOD)) / (1 - stale_prop)
552                         
553                         this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
554                             shares, stale_orphan_shares, stale_doa_shares,
555                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
556                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
557                             get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
558                         )
559                         this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
560                             math.format(int(real_att_s)),
561                             100*stale_prop,
562                             math.format_dt(2**256 / bitcoind_work.value['bits'].target / real_att_s),
563                         )
564                         
565                         for warning in p2pool_data.get_warnings(tracker, best_share_var.value, net, bitcoind_warning_var.value, bitcoind_work.value):
566                             print >>sys.stderr, '#'*40
567                             print >>sys.stderr, '>>> Warning: ' + warning
568                             print >>sys.stderr, '#'*40
569                     
570                     if this_str != last_str or time.time() > last_time + 15:
571                         print this_str
572                         last_str = this_str
573                         last_time = time.time()
574                 except:
575                     log.err()
576         status_thread()
577     except:
578         reactor.stop()
579         log.err(None, 'Fatal error:')
580
581 def run():
582     realnets = dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
583     
584     parser = fixargparse.FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
585     parser.add_argument('--version', action='version', version=p2pool.__version__)
586     parser.add_argument('--net',
587         help='use specified network (default: bitcoin)',
588         action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
589     parser.add_argument('--testnet',
590         help='''use the network's testnet''',
591         action='store_const', const=True, default=False, dest='testnet')
592     parser.add_argument('--debug',
593         help='enable debugging mode',
594         action='store_const', const=True, default=False, dest='debug')
595     parser.add_argument('-a', '--address',
596         help='generate payouts to this address (default: <address requested from bitcoind>)',
597         type=str, action='store', default=None, dest='address')
598     parser.add_argument('--datadir',
599         help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
600         type=str, action='store', default=None, dest='datadir')
601     parser.add_argument('--logfile',
602         help='''log to this file (default: data/<NET>/log)''',
603         type=str, action='store', default=None, dest='logfile')
604     parser.add_argument('--merged',
605         help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
606         type=str, action='append', default=[], dest='merged_urls')
607     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
608         help='donate this percentage of work towards the development of p2pool (default: 0.5)',
609         type=float, action='store', default=0.5, dest='donation_percentage')
610     parser.add_argument('--iocp',
611         help='use Windows IOCP API in order to avoid errors due to large number of sockets being open',
612         action='store_true', default=False, dest='iocp')
613     parser.add_argument('--irc-announce',
614         help='announce any blocks found on irc://irc.freenode.net/#p2pool',
615         action='store_true', default=False, dest='irc_announce')
616     parser.add_argument('--no-bugreport',
617         help='disable submitting caught exceptions to the author',
618         action='store_true', default=False, dest='no_bugreport')
619     
620     p2pool_group = parser.add_argument_group('p2pool interface')
621     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
622         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
623         type=int, action='store', default=None, dest='p2pool_port')
624     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
625         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
626         type=str, action='append', default=[], dest='p2pool_nodes')
627     parser.add_argument('--disable-upnp',
628         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
629         action='store_false', default=True, dest='upnp')
630     p2pool_group.add_argument('--max-conns', metavar='CONNS',
631         help='maximum incoming connections (default: 40)',
632         type=int, action='store', default=40, dest='p2pool_conns')
633     
634     worker_group = parser.add_argument_group('worker interface')
635     worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
636         help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
637         type=str, action='store', default=None, dest='worker_endpoint')
638     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
639         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
640         type=float, action='store', default=0, dest='worker_fee')
641     
642     bitcoind_group = parser.add_argument_group('bitcoind interface')
643     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
644         help='connect to this address (default: 127.0.0.1)',
645         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
646     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
647         help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
648         type=int, action='store', default=None, dest='bitcoind_rpc_port')
649     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
650         help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
651         type=int, action='store', default=None, dest='bitcoind_p2p_port')
652     
653     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
654         help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
655         type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
656     
657     args = parser.parse_args()
658     
659     if args.debug:
660         p2pool.DEBUG = True
661         defer.setDebugging(True)
662     
663     net_name = args.net_name + ('_testnet' if args.testnet else '')
664     net = networks.nets[net_name]
665     
666     datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
667     if not os.path.exists(datadir_path):
668         os.makedirs(datadir_path)
669     
670     if len(args.bitcoind_rpc_userpass) > 2:
671         parser.error('a maximum of two arguments are allowed')
672     args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
673     
674     if args.bitcoind_rpc_password is None:
675         conf_path = net.PARENT.CONF_FILE_FUNC()
676         if not os.path.exists(conf_path):
677             parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
678                 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
679                 '''\r\n'''
680                 '''server=1\r\n'''
681                 '''rpcpassword=%x\r\n'''
682                 '''\r\n'''
683                 '''Keep that password secret! After creating the file, restart Bitcoin.''' % (conf_path, random.randrange(2**128)))
684         with open(conf_path, 'rb') as f:
685             cp = ConfigParser.RawConfigParser()
686             cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
687             for conf_name, var_name, var_type in [
688                 ('rpcuser', 'bitcoind_rpc_username', str),
689                 ('rpcpassword', 'bitcoind_rpc_password', str),
690                 ('rpcport', 'bitcoind_rpc_port', int),
691                 ('port', 'bitcoind_p2p_port', int),
692             ]:
693                 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
694                     setattr(args, var_name, var_type(cp.get('x', conf_name)))
695         if args.bitcoind_rpc_password is None:
696             parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
697     
698     if args.bitcoind_rpc_username is None:
699         args.bitcoind_rpc_username = ''
700     
701     if args.bitcoind_rpc_port is None:
702         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
703     
704     if args.bitcoind_p2p_port is None:
705         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
706     
707     if args.p2pool_port is None:
708         args.p2pool_port = net.P2P_PORT
709     
710     if args.worker_endpoint is None:
711         worker_endpoint = '', net.WORKER_PORT
712     elif ':' not in args.worker_endpoint:
713         worker_endpoint = '', int(args.worker_endpoint)
714     else:
715         addr, port = args.worker_endpoint.rsplit(':', 1)
716         worker_endpoint = addr, int(port)
717     
718     if args.address is not None:
719         try:
720             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
721         except Exception, e:
722             parser.error('error parsing address: ' + repr(e))
723     else:
724         args.pubkey_hash = None
725     
726     def separate_url(url):
727         s = urlparse.urlsplit(url)
728         if '@' not in s.netloc:
729             parser.error('merged url netloc must contain an "@"')
730         userpass, new_netloc = s.netloc.rsplit('@', 1)
731         return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
732     merged_urls = map(separate_url, args.merged_urls)
733     
734     if args.logfile is None:
735         args.logfile = os.path.join(datadir_path, 'log')
736     
737     logfile = logging.LogFile(args.logfile)
738     pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
739     sys.stdout = logging.AbortPipe(pipe)
740     sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
741     if hasattr(signal, "SIGUSR1"):
742         def sigusr1(signum, frame):
743             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
744             logfile.reopen()
745             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
746         signal.signal(signal.SIGUSR1, sigusr1)
747     task.LoopingCall(logfile.reopen).start(5)
748     
749     class ErrorReporter(object):
750         def __init__(self):
751             self.last_sent = None
752         
753         def emit(self, eventDict):
754             if not eventDict["isError"]:
755                 return
756             
757             if self.last_sent is not None and time.time() < self.last_sent + 5:
758                 return
759             self.last_sent = time.time()
760             
761             if 'failure' in eventDict:
762                 text = ((eventDict.get('why') or 'Unhandled Error')
763                     + '\n' + eventDict['failure'].getTraceback())
764             else:
765                 text = " ".join([str(m) for m in eventDict["message"]]) + "\n"
766             
767             from twisted.web import client
768             client.getPage(
769                 url='http://u.forre.st/p2pool_error.cgi',
770                 method='POST',
771                 postdata=p2pool.__version__ + ' ' + net.NAME + '\n' + text,
772                 timeout=15,
773             ).addBoth(lambda x: None)
774     if not args.no_bugreport:
775         log.addObserver(ErrorReporter().emit)
776     
777     reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)
778     reactor.run()