some pylint fixes
[p2pool.git] / p2pool / main.py
1 from __future__ import division
2
3 import ConfigParser
4 import StringIO
5 import base64
6 import json
7 import os
8 import random
9 import sys
10 import time
11 import signal
12 import traceback
13 import urlparse
14
15 if '--iocp' in sys.argv:
16     from twisted.internet import iocpreactor
17     iocpreactor.install()
18 from twisted.internet import defer, reactor, protocol, task
19 from twisted.web import server
20 from twisted.python import log
21 from nattraverso import portmapper, ipdiscover
22
23 import bitcoin.p2p as bitcoin_p2p, bitcoin.data as bitcoin_data
24 from bitcoin import worker_interface, height_tracker
25 from util import expiring_dict, fixargparse, jsonrpc, variable, deferral, math, logging
26 from . import p2p, networks, web, work
27 import p2pool, p2pool.data as p2pool_data
28
29 @deferral.retry('Error getting work from bitcoind:', 3)
30 @defer.inlineCallbacks
31 def getwork(bitcoind):
32     try:
33         work = yield bitcoind.rpc_getmemorypool()
34     except jsonrpc.Error, e:
35         if e.code == -32601: # Method not found
36             print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
37             raise deferral.RetrySilentlyException()
38         raise
39     packed_transactions = [x.decode('hex') for x in work['transactions']]
40     defer.returnValue(dict(
41         version=work['version'],
42         previous_block_hash=int(work['previousblockhash'], 16),
43         transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
44         merkle_link=bitcoin_data.calculate_merkle_link([None] + map(bitcoin_data.hash256, packed_transactions), 0),
45         subsidy=work['coinbasevalue'],
46         time=work['time'],
47         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
48         coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
49     ))
50
51 @defer.inlineCallbacks
52 def main(args, net, datadir_path, merged_urls, worker_endpoint):
53     try:
54         print 'p2pool (version %s)' % (p2pool.__version__,)
55         print
56         
57         # connect to bitcoind over JSON-RPC and do initial getmemorypool
58         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
59         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
60         bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
61         @deferral.retry('Error while checking Bitcoin connection:', 1)
62         @defer.inlineCallbacks
63         def check():
64             if not (yield net.PARENT.RPC_CHECK)(bitcoind):
65                 print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
66                 raise deferral.RetrySilentlyException()
67             temp_work = yield getwork(bitcoind)
68             if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version'], temp_work):
69                 print >>sys.stderr, '    Bitcoin version too old! BIP16 support required! Upgrade to 0.6.0rc4 or greater!'
70                 raise deferral.RetrySilentlyException()
71             defer.returnValue(temp_work)
72         temp_work = yield check()
73         
74         block_height_var = variable.Variable(None)
75         @defer.inlineCallbacks
76         def poll_height():
77             block_height_var.set((yield deferral.retry('Error while calling getblockcount:')(bitcoind.rpc_getblockcount)()))
78         yield poll_height()
79         task.LoopingCall(poll_height).start(60*60)
80         
81         print '    ...success!'
82         print '    Current block hash: %x' % (temp_work['previous_block_hash'],)
83         print '    Current block height: %i' % (block_height_var.value,)
84         print
85         
86         # connect to bitcoind over bitcoin-p2p
87         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
88         factory = bitcoin_p2p.ClientFactory(net.PARENT)
89         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
90         yield factory.getProtocol() # waits until handshake is successful
91         print '    ...success!'
92         print
93         
94         print 'Determining payout address...'
95         if args.pubkey_hash is None:
96             address_path = os.path.join(datadir_path, 'cached_payout_address')
97             
98             if os.path.exists(address_path):
99                 with open(address_path, 'rb') as f:
100                     address = f.read().strip('\r\n')
101                 print '    Loaded cached address: %s...' % (address,)
102             else:
103                 address = None
104             
105             if address is not None:
106                 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
107                 if not res['isvalid'] or not res['ismine']:
108                     print '    Cached address is either invalid or not controlled by local bitcoind!'
109                     address = None
110             
111             if address is None:
112                 print '    Getting payout address from bitcoind...'
113                 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
114             
115             with open(address_path, 'wb') as f:
116                 f.write(address)
117             
118             my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
119         else:
120             my_pubkey_hash = args.pubkey_hash
121         print '    ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
122         print
123         
124         my_share_hashes = set()
125         my_doa_share_hashes = set()
126         
127         tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
128         shared_share_hashes = set()
129         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
130         known_verified = set()
131         print "Loading shares..."
132         for i, (mode, contents) in enumerate(ss.get_shares()):
133             if mode == 'share':
134                 if contents.hash in tracker.shares:
135                     continue
136                 shared_share_hashes.add(contents.hash)
137                 contents.time_seen = 0
138                 tracker.add(contents)
139                 if len(tracker.shares) % 1000 == 0 and tracker.shares:
140                     print "    %i" % (len(tracker.shares),)
141             elif mode == 'verified_hash':
142                 known_verified.add(contents)
143             else:
144                 raise AssertionError()
145         print "    ...inserting %i verified shares..." % (len(known_verified),)
146         for h in known_verified:
147             if h not in tracker.shares:
148                 ss.forget_verified_share(h)
149                 continue
150             tracker.verified.add(tracker.shares[h])
151         print "    ...done loading %i shares!" % (len(tracker.shares),)
152         print
153         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
154         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
155         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
156         
157         print 'Initializing work...'
158         
159         
160         # BITCOIND WORK
161         
162         bitcoind_work = variable.Variable(None)
163         
164         @defer.inlineCallbacks
165         def poll_bitcoind():
166             work = yield getwork(bitcoind)
167             bitcoind_work.set(dict(
168                 version=work['version'],
169                 previous_block=work['previous_block_hash'],
170                 bits=work['bits'],
171                 coinbaseflags=work['coinbaseflags'],
172                 time=work['time'],
173                 transactions=work['transactions'],
174                 merkle_link=work['merkle_link'],
175                 subsidy=work['subsidy'],
176                 clock_offset=time.time() - work['time'],
177                 last_update=time.time(),
178             ))
179         yield poll_bitcoind()
180         
181         @defer.inlineCallbacks
182         def work_poller():
183             while True:
184                 flag = factory.new_block.get_deferred()
185                 try:
186                     yield poll_bitcoind()
187                 except:
188                     log.err()
189                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
190         work_poller()
191         
192         # PEER WORK
193         
194         best_block_header = variable.Variable(None)
195         def handle_header(new_header):
196             # check that header matches current target
197             if not (net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(new_header)) <= bitcoind_work.value['bits'].target):
198                 return
199             bitcoind_best_block = bitcoind_work.value['previous_block']
200             if (best_block_header.value is None
201                 or (
202                     new_header['previous_block'] == bitcoind_best_block and
203                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)) == bitcoind_best_block
204                 ) # new is child of current and previous is current
205                 or (
206                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(new_header)) == bitcoind_best_block and
207                     best_block_header.value['previous_block'] != bitcoind_best_block
208                 )): # new is current and previous is not a child of current
209                 best_block_header.set(new_header)
210         @defer.inlineCallbacks
211         def poll_header():
212             handle_header((yield factory.conn.value.get_block_header(bitcoind_work.value['previous_block'])))
213         bitcoind_work.changed.watch(lambda _: poll_header())
214         yield poll_header()
215         
216         # BEST SHARE
217         
218         get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: bitcoind_work.value['previous_block'], net)
219         requested = expiring_dict.ExpiringDict(300)
220         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
221         
222         best_share_var = variable.Variable(None)
223         def set_best_share():
224             best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'])
225             
226             best_share_var.set(best)
227             
228             t = time.time()
229             for peer2, share_hash in desired:
230                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
231                     continue
232                 last_request_time, count = requested.get(share_hash, (None, 0))
233                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
234                     continue
235                 potential_peers = set()
236                 for head in tracker.tails[share_hash]:
237                     potential_peers.update(peer_heads.get(head, set()))
238                 potential_peers = [peer for peer in potential_peers if peer.connected2]
239                 if count == 0 and peer2 is not None and peer2.connected2:
240                     peer = peer2
241                 else:
242                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
243                     if peer is None:
244                         continue
245                 
246                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
247                 peer.send_getshares(
248                     hashes=[share_hash],
249                     parents=2000,
250                     stops=list(set(tracker.heads) | set(
251                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
252                     ))[:100],
253                 )
254                 requested[share_hash] = t, count + 1
255         bitcoind_work.changed.watch(lambda _: set_best_share())
256         set_best_share()
257         
258         
259         print '    ...success!'
260         print
261         
262         # setup p2p logic and join p2pool network
263         
264         class Node(p2p.Node):
265             def handle_shares(self, shares, peer):
266                 if len(shares) > 5:
267                     print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
268                 
269                 new_count = 0
270                 for share in shares:
271                     if share.hash in tracker.shares:
272                         #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
273                         continue
274                     
275                     new_count += 1
276                     
277                     #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
278                     
279                     tracker.add(share)
280                 
281                 if shares and peer is not None:
282                     peer_heads.setdefault(shares[0].hash, set()).add(peer)
283                 
284                 if new_count:
285                     set_best_share()
286                 
287                 if len(shares) > 5:
288                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
289             
290             def handle_share_hashes(self, hashes, peer):
291                 t = time.time()
292                 get_hashes = []
293                 for share_hash in hashes:
294                     if share_hash in tracker.shares:
295                         continue
296                     last_request_time, count = requested.get(share_hash, (None, 0))
297                     if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
298                         continue
299                     print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
300                     get_hashes.append(share_hash)
301                     requested[share_hash] = t, count + 1
302                 
303                 if hashes and peer is not None:
304                     peer_heads.setdefault(hashes[0], set()).add(peer)
305                 if get_hashes:
306                     peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
307             
308             def handle_get_shares(self, hashes, parents, stops, peer):
309                 parents = min(parents, 1000//len(hashes))
310                 stops = set(stops)
311                 shares = []
312                 for share_hash in hashes:
313                     for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
314                         if share.hash in stops:
315                             break
316                         shares.append(share)
317                 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
318                 return shares
319             
320             def handle_bestblock(self, header, peer):
321                 if net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header)) > header['bits'].target:
322                     raise p2p.PeerMisbehavingError('received block header fails PoW test')
323                 handle_header(header)
324         
325         @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
326         def submit_block_p2p(block):
327             if factory.conn.value is None:
328                 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%32x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, bitcoin_data.hash256(bitcoin_data.block_header_type.pack(block['header'])))
329                 raise deferral.RetrySilentlyException()
330             factory.conn.value.send_block(block=block)
331         
332         @deferral.retry('Error submitting block: (will retry)', 10, 10)
333         @defer.inlineCallbacks
334         def submit_block_rpc(block, ignore_failure):
335             success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
336             success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
337             if (not success and success_expected and not ignore_failure) or (success and not success_expected):
338                 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (success, success_expected)
339         
340         def submit_block(block, ignore_failure):
341             submit_block_p2p(block)
342             submit_block_rpc(block, ignore_failure)
343         
344         @tracker.verified.added.watch
345         def _(share):
346             if share.pow_hash <= share.header['bits'].target:
347                 submit_block(share.as_block(tracker), ignore_failure=True)
348                 print
349                 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
350                 print
351                 def spread():
352                     if (get_height_rel_highest(share.header['previous_block']) > -5 or
353                         bitcoind_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]):
354                         broadcast_share(share.hash)
355                 spread()
356                 reactor.callLater(5, spread) # so get_height_rel_highest can update
357         
358         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
359         
360         @defer.inlineCallbacks
361         def parse(x):
362             if ':' in x:
363                 ip, port = x.split(':')
364                 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
365             else:
366                 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
367         
368         addrs = {}
369         if os.path.exists(os.path.join(datadir_path, 'addrs')):
370             try:
371                 with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
372                     addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
373             except:
374                 print >>sys.stderr, 'error parsing addrs'
375         for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
376             try:
377                 addr = yield addr_df
378                 if addr not in addrs:
379                     addrs[addr] = (0, time.time(), time.time())
380             except:
381                 log.err()
382         
383         connect_addrs = set()
384         for addr_df in map(parse, args.p2pool_nodes):
385             try:
386                 connect_addrs.add((yield addr_df))
387             except:
388                 log.err()
389         
390         p2p_node = Node(
391             best_share_hash_func=lambda: best_share_var.value,
392             port=args.p2pool_port,
393             net=net,
394             addr_store=addrs,
395             connect_addrs=connect_addrs,
396             max_incoming_conns=args.p2pool_conns,
397         )
398         p2p_node.start()
399         
400         def save_addrs():
401             with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
402                 f.write(json.dumps(p2p_node.addr_store.items()))
403         task.LoopingCall(save_addrs).start(60)
404         
405         @best_block_header.changed.watch
406         def _(header):
407             for peer in p2p_node.peers.itervalues():
408                 peer.send_bestblock(header=header)
409         
410         def broadcast_share(share_hash):
411             shares = []
412             for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
413                 if share.hash in shared_share_hashes:
414                     break
415                 shared_share_hashes.add(share.hash)
416                 shares.append(share)
417             
418             for peer in p2p_node.peers.itervalues():
419                 peer.sendShares([share for share in shares if share.peer is not peer])
420         
421         # send share when the chain changes to their chain
422         best_share_var.changed.watch(broadcast_share)
423         
424         def save_shares():
425             for share in tracker.get_chain(best_share_var.value, min(tracker.get_height(best_share_var.value), 2*net.CHAIN_LENGTH)):
426                 ss.add_share(share)
427                 if share.hash in tracker.verified.shares:
428                     ss.add_verified_hash(share.hash)
429         task.LoopingCall(save_shares).start(60)
430         
431         print '    ...success!'
432         print
433         
434         if args.upnp:
435             @defer.inlineCallbacks
436             def upnp_thread():
437                 while True:
438                     try:
439                         is_lan, lan_ip = yield ipdiscover.get_local_ip()
440                         if is_lan:
441                             pm = yield portmapper.get_port_mapper()
442                             yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
443                     except defer.TimeoutError:
444                         pass
445                     except:
446                         if p2pool.DEBUG:
447                             log.err(None, 'UPnP error:')
448                     yield deferral.sleep(random.expovariate(1/120))
449             upnp_thread()
450         
451         # start listening for workers with a JSON-RPC server
452         
453         print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
454         
455         get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, best_share_var.value, bitcoind_work.value['bits'].target, bitcoind_work.value['subsidy'], net)
456         
457         wb = work.WorkerBridge(my_pubkey_hash, net, args.donation_percentage, bitcoind_work, best_block_header, merged_urls, best_share_var, tracker, my_share_hashes, my_doa_share_hashes, args.worker_fee, p2p_node, submit_block, set_best_share, shared_share_hashes, block_height_var)
458         web_root = web.get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net, wb.get_stale_counts, my_pubkey_hash, wb.local_rate_monitor, args.worker_fee, p2p_node, wb.my_share_hashes, wb.pseudoshare_received, wb.share_received, best_share_var)
459         worker_interface.WorkerInterface(wb).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
460         
461         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
462         
463         with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
464             pass
465         
466         print '    ...success!'
467         print
468         
469         
470         # done!
471         print 'Started successfully!'
472         print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
473         if args.donation_percentage > 0.51:
474             print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
475         elif args.donation_percentage < 0.49:
476             print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
477         else:
478             print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
479             print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
480         print
481         
482         
483         if hasattr(signal, 'SIGALRM'):
484             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
485                 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
486             ))
487             signal.siginterrupt(signal.SIGALRM, False)
488             task.LoopingCall(signal.alarm, 30).start(1)
489         
490         if args.irc_announce:
491             from twisted.words.protocols import irc
492             class IRCClient(irc.IRCClient):
493                 nickname = 'p2pool%02i' % (random.randrange(100),)
494                 channel = net.ANNOUNCE_CHANNEL
495                 def lineReceived(self, line):
496                     if p2pool.DEBUG:
497                         print repr(line)
498                     irc.IRCClient.lineReceived(self, line)
499                 def signedOn(self):
500                     irc.IRCClient.signedOn(self)
501                     self.factory.resetDelay()
502                     self.join(self.channel)
503                     @defer.inlineCallbacks
504                     def new_share(share):
505                         if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
506                             yield deferral.sleep(random.expovariate(1/60))
507                             message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
508                             if message not in self.recent_messages:
509                                 self.say(self.channel, message)
510                                 self._remember_message(message)
511                     self.watch_id = tracker.verified.added.watch(new_share)
512                     self.recent_messages = []
513                 def _remember_message(self, message):
514                     self.recent_messages.append(message)
515                     while len(self.recent_messages) > 100:
516                         self.recent_messages.pop(0)
517                 def privmsg(self, user, channel, message):
518                     if channel == self.channel:
519                         self._remember_message(message)
520                 def connectionLost(self, reason):
521                     tracker.verified.added.unwatch(self.watch_id)
522                     print 'IRC connection lost:', reason.getErrorMessage()
523             class IRCClientFactory(protocol.ReconnectingClientFactory):
524                 protocol = IRCClient
525             reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
526         
527         @defer.inlineCallbacks
528         def status_thread():
529             last_str = None
530             last_time = 0
531             while True:
532                 yield deferral.sleep(3)
533                 try:
534                     if time.time() > bitcoind_work.value['last_update'] + 60:
535                         print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - bitcoind_work.value['last_update']),)
536                     
537                     height = tracker.get_height(best_share_var.value)
538                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
539                         height,
540                         len(tracker.verified.shares),
541                         len(tracker.shares),
542                         len(p2p_node.peers),
543                         sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
544                     ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
545                     
546                     datums, dt = wb.local_rate_monitor.get_datums_in_last()
547                     my_att_s = sum(datum['work']/dt for datum in datums)
548                     this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
549                         math.format(int(my_att_s)),
550                         math.format_dt(dt),
551                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
552                         math.format_dt(2**256 / tracker.shares[best_share_var.value].max_target / my_att_s) if my_att_s and best_share_var.value else '???',
553                     )
554                     
555                     if height > 2:
556                         (stale_orphan_shares, stale_doa_shares), shares, _ = wb.get_stale_counts()
557                         stale_prop = p2pool_data.get_average_stale_prop(tracker, best_share_var.value, min(60*60//net.SHARE_PERIOD, height))
558                         real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, best_share_var.value, min(height - 1, 60*60//net.SHARE_PERIOD)) / (1 - stale_prop)
559                         
560                         this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
561                             shares, stale_orphan_shares, stale_doa_shares,
562                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
563                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
564                             get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
565                         )
566                         this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
567                             math.format(int(real_att_s)),
568                             100*stale_prop,
569                             math.format_dt(2**256 / bitcoind_work.value['bits'].target / real_att_s),
570                         )
571                         
572                         for warning in p2pool_data.get_warnings(tracker, best_share_var.value, net):
573                             print >>sys.stderr, '#'*40
574                             print >>sys.stderr, '>>> Warning: ' + warning
575                             print >>sys.stderr, '#'*40
576                     
577                     if this_str != last_str or time.time() > last_time + 15:
578                         print this_str
579                         last_str = this_str
580                         last_time = time.time()
581                 except:
582                     log.err()
583         status_thread()
584     except:
585         reactor.stop()
586         log.err(None, 'Fatal error:')
587
588 def run():
589     realnets = dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
590     
591     parser = fixargparse.FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
592     parser.add_argument('--version', action='version', version=p2pool.__version__)
593     parser.add_argument('--net',
594         help='use specified network (default: bitcoin)',
595         action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
596     parser.add_argument('--testnet',
597         help='''use the network's testnet''',
598         action='store_const', const=True, default=False, dest='testnet')
599     parser.add_argument('--debug',
600         help='enable debugging mode',
601         action='store_const', const=True, default=False, dest='debug')
602     parser.add_argument('-a', '--address',
603         help='generate payouts to this address (default: <address requested from bitcoind>)',
604         type=str, action='store', default=None, dest='address')
605     parser.add_argument('--datadir',
606         help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
607         type=str, action='store', default=None, dest='datadir')
608     parser.add_argument('--logfile',
609         help='''log to this file (default: data/<NET>/log)''',
610         type=str, action='store', default=None, dest='logfile')
611     parser.add_argument('--merged',
612         help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
613         type=str, action='append', default=[], dest='merged_urls')
614     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
615         help='donate this percentage of work towards the development of p2pool (default: 0.5)',
616         type=float, action='store', default=0.5, dest='donation_percentage')
617     parser.add_argument('--iocp',
618         help='use Windows IOCP API in order to avoid errors due to large number of sockets being open',
619         action='store_true', default=False, dest='iocp')
620     parser.add_argument('--irc-announce',
621         help='announce any blocks found on irc://irc.freenode.net/#p2pool',
622         action='store_true', default=False, dest='irc_announce')
623     parser.add_argument('--no-bugreport',
624         help='disable submitting caught exceptions to the author',
625         action='store_true', default=False, dest='no_bugreport')
626     
627     p2pool_group = parser.add_argument_group('p2pool interface')
628     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
629         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
630         type=int, action='store', default=None, dest='p2pool_port')
631     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
632         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
633         type=str, action='append', default=[], dest='p2pool_nodes')
634     parser.add_argument('--disable-upnp',
635         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
636         action='store_false', default=True, dest='upnp')
637     p2pool_group.add_argument('--max-conns', metavar='CONNS',
638         help='maximum incoming connections (default: 40)',
639         type=int, action='store', default=40, dest='p2pool_conns')
640     
641     worker_group = parser.add_argument_group('worker interface')
642     worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
643         help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
644         type=str, action='store', default=None, dest='worker_endpoint')
645     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
646         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
647         type=float, action='store', default=0, dest='worker_fee')
648     
649     bitcoind_group = parser.add_argument_group('bitcoind interface')
650     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
651         help='connect to this address (default: 127.0.0.1)',
652         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
653     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
654         help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
655         type=int, action='store', default=None, dest='bitcoind_rpc_port')
656     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
657         help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
658         type=int, action='store', default=None, dest='bitcoind_p2p_port')
659     
660     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
661         help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
662         type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
663     
664     args = parser.parse_args()
665     
666     if args.debug:
667         p2pool.DEBUG = True
668     
669     net_name = args.net_name + ('_testnet' if args.testnet else '')
670     net = networks.nets[net_name]
671     
672     datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
673     if not os.path.exists(datadir_path):
674         os.makedirs(datadir_path)
675     
676     if len(args.bitcoind_rpc_userpass) > 2:
677         parser.error('a maximum of two arguments are allowed')
678     args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
679     
680     if args.bitcoind_rpc_password is None:
681         if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
682             parser.error('This network has no configuration file function. Manually enter your RPC password.')
683         conf_path = net.PARENT.CONF_FILE_FUNC()
684         if not os.path.exists(conf_path):
685             parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
686                 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
687                 '''\r\n'''
688                 '''server=1\r\n'''
689                 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
690         with open(conf_path, 'rb') as f:
691             cp = ConfigParser.RawConfigParser()
692             cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
693             for conf_name, var_name, var_type in [
694                 ('rpcuser', 'bitcoind_rpc_username', str),
695                 ('rpcpassword', 'bitcoind_rpc_password', str),
696                 ('rpcport', 'bitcoind_rpc_port', int),
697                 ('port', 'bitcoind_p2p_port', int),
698             ]:
699                 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
700                     setattr(args, var_name, var_type(cp.get('x', conf_name)))
701         if args.bitcoind_rpc_password is None:
702             parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
703     
704     if args.bitcoind_rpc_username is None:
705         args.bitcoind_rpc_username = ''
706     
707     if args.bitcoind_rpc_port is None:
708         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
709     
710     if args.bitcoind_p2p_port is None:
711         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
712     
713     if args.p2pool_port is None:
714         args.p2pool_port = net.P2P_PORT
715     
716     if args.worker_endpoint is None:
717         worker_endpoint = '', net.WORKER_PORT
718     elif ':' not in args.worker_endpoint:
719         worker_endpoint = '', int(args.worker_endpoint)
720     else:
721         addr, port = args.worker_endpoint.rsplit(':', 1)
722         worker_endpoint = addr, int(port)
723     
724     if args.address is not None:
725         try:
726             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
727         except Exception, e:
728             parser.error('error parsing address: ' + repr(e))
729     else:
730         args.pubkey_hash = None
731     
732     def separate_url(url):
733         s = urlparse.urlsplit(url)
734         if '@' not in s.netloc:
735             parser.error('merged url netloc must contain an "@"')
736         userpass, new_netloc = s.netloc.rsplit('@', 1)
737         return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
738     merged_urls = map(separate_url, args.merged_urls)
739     
740     if args.logfile is None:
741         args.logfile = os.path.join(datadir_path, 'log')
742     
743     logfile = logging.LogFile(args.logfile)
744     pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
745     sys.stdout = logging.AbortPipe(pipe)
746     sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
747     if hasattr(signal, "SIGUSR1"):
748         def sigusr1(signum, frame):
749             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
750             logfile.reopen()
751             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
752         signal.signal(signal.SIGUSR1, sigusr1)
753     task.LoopingCall(logfile.reopen).start(5)
754     
755     class ErrorReporter(object):
756         def __init__(self):
757             self.last_sent = None
758         
759         def emit(self, eventDict):
760             if not eventDict["isError"]:
761                 return
762             
763             if self.last_sent is not None and time.time() < self.last_sent + 5:
764                 return
765             self.last_sent = time.time()
766             
767             if 'failure' in eventDict:
768                 text = ((eventDict.get('why') or 'Unhandled Error')
769                     + '\n' + eventDict['failure'].getTraceback())
770             else:
771                 text = " ".join([str(m) for m in eventDict["message"]]) + "\n"
772             
773             from twisted.web import client
774             client.getPage(
775                 url='http://u.forre.st/p2pool_error.cgi',
776                 method='POST',
777                 postdata=p2pool.__version__ + ' ' + net.NAME + '\n' + text,
778                 timeout=15,
779             ).addBoth(lambda x: None)
780     if not args.no_bugreport:
781         log.addObserver(ErrorReporter().emit)
782     
783     reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)
784     reactor.run()