fixed RPC check function not actually doing anything due to mismatched parenthesis
[p2pool.git] / p2pool / main.py
1 from __future__ import division
2
3 import ConfigParser
4 import StringIO
5 import base64
6 import json
7 import os
8 import random
9 import sys
10 import time
11 import signal
12 import traceback
13 import urlparse
14
15 if '--iocp' in sys.argv:
16     from twisted.internet import iocpreactor
17     iocpreactor.install()
18 from twisted.internet import defer, reactor, protocol, task
19 from twisted.web import server
20 from twisted.python import log
21 from nattraverso import portmapper, ipdiscover
22
23 import bitcoin.p2p as bitcoin_p2p, bitcoin.data as bitcoin_data
24 from bitcoin import worker_interface, height_tracker
25 from util import expiring_dict, fixargparse, jsonrpc, variable, deferral, math, logging
26 from . import p2p, networks, web, work
27 import p2pool, p2pool.data as p2pool_data
28
29 @deferral.retry('Error getting work from bitcoind:', 3)
30 @defer.inlineCallbacks
31 def getwork(bitcoind):
32     try:
33         work = yield bitcoind.rpc_getmemorypool()
34     except jsonrpc.Error, e:
35         if e.code == -32601: # Method not found
36             print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
37             raise deferral.RetrySilentlyException()
38         raise
39     packed_transactions = [x.decode('hex') for x in work['transactions']]
40     defer.returnValue(dict(
41         version=work['version'],
42         previous_block=int(work['previousblockhash'], 16),
43         transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
44         merkle_link=bitcoin_data.calculate_merkle_link([None] + map(bitcoin_data.hash256, packed_transactions), 0),
45         subsidy=work['coinbasevalue'],
46         time=work['time'],
47         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
48         coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
49         clock_offset=time.time() - work['time'],
50         last_update=time.time(),
51     ))
52
53 @defer.inlineCallbacks
54 def main(args, net, datadir_path, merged_urls, worker_endpoint):
55     try:
56         print 'p2pool (version %s)' % (p2pool.__version__,)
57         print
58         
59         # connect to bitcoind over JSON-RPC and do initial getmemorypool
60         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
61         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
62         bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
63         @deferral.retry('Error while checking Bitcoin connection:', 1)
64         @defer.inlineCallbacks
65         def check():
66             if not (yield net.PARENT.RPC_CHECK(bitcoind)):
67                 print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
68                 raise deferral.RetrySilentlyException()
69             temp_work = yield getwork(bitcoind)
70             if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version'], temp_work):
71                 print >>sys.stderr, '    Bitcoin version too old! BIP16 support required! Upgrade to 0.6.0rc4 or greater!'
72                 raise deferral.RetrySilentlyException()
73             defer.returnValue(temp_work)
74         temp_work = yield check()
75         
76         block_height_var = variable.Variable(None)
77         @defer.inlineCallbacks
78         def poll_height():
79             block_height_var.set((yield deferral.retry('Error while calling getblockcount:')(bitcoind.rpc_getblockcount)()))
80         yield poll_height()
81         task.LoopingCall(poll_height).start(60*60)
82         
83         print '    ...success!'
84         print '    Current block hash: %x' % (temp_work['previous_block'],)
85         print '    Current block height: %i' % (block_height_var.value,)
86         print
87         
88         # connect to bitcoind over bitcoin-p2p
89         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
90         factory = bitcoin_p2p.ClientFactory(net.PARENT)
91         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
92         yield factory.getProtocol() # waits until handshake is successful
93         print '    ...success!'
94         print
95         
96         print 'Determining payout address...'
97         if args.pubkey_hash is None:
98             address_path = os.path.join(datadir_path, 'cached_payout_address')
99             
100             if os.path.exists(address_path):
101                 with open(address_path, 'rb') as f:
102                     address = f.read().strip('\r\n')
103                 print '    Loaded cached address: %s...' % (address,)
104             else:
105                 address = None
106             
107             if address is not None:
108                 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
109                 if not res['isvalid'] or not res['ismine']:
110                     print '    Cached address is either invalid or not controlled by local bitcoind!'
111                     address = None
112             
113             if address is None:
114                 print '    Getting payout address from bitcoind...'
115                 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
116             
117             with open(address_path, 'wb') as f:
118                 f.write(address)
119             
120             my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
121         else:
122             my_pubkey_hash = args.pubkey_hash
123         print '    ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
124         print
125         
126         my_share_hashes = set()
127         my_doa_share_hashes = set()
128         
129         tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
130         shared_share_hashes = set()
131         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
132         known_verified = set()
133         print "Loading shares..."
134         for i, (mode, contents) in enumerate(ss.get_shares()):
135             if mode == 'share':
136                 if contents.hash in tracker.shares:
137                     continue
138                 shared_share_hashes.add(contents.hash)
139                 contents.time_seen = 0
140                 tracker.add(contents)
141                 if len(tracker.shares) % 1000 == 0 and tracker.shares:
142                     print "    %i" % (len(tracker.shares),)
143             elif mode == 'verified_hash':
144                 known_verified.add(contents)
145             else:
146                 raise AssertionError()
147         print "    ...inserting %i verified shares..." % (len(known_verified),)
148         for h in known_verified:
149             if h not in tracker.shares:
150                 ss.forget_verified_share(h)
151                 continue
152             tracker.verified.add(tracker.shares[h])
153         print "    ...done loading %i shares!" % (len(tracker.shares),)
154         print
155         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
156         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
157         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
158         
159         print 'Initializing work...'
160         
161         
162         # BITCOIND WORK
163         
164         bitcoind_work = variable.Variable((yield getwork(bitcoind)))
165         @defer.inlineCallbacks
166         def work_poller():
167             while True:
168                 flag = factory.new_block.get_deferred()
169                 try:
170                     bitcoind_work.set((yield getwork(bitcoind)))
171                 except:
172                     log.err()
173                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
174         work_poller()
175         
176         # PEER WORK
177         
178         best_block_header = variable.Variable(None)
179         def handle_header(new_header):
180             # check that header matches current target
181             if not (net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(new_header)) <= bitcoind_work.value['bits'].target):
182                 return
183             bitcoind_best_block = bitcoind_work.value['previous_block']
184             if (best_block_header.value is None
185                 or (
186                     new_header['previous_block'] == bitcoind_best_block and
187                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)) == bitcoind_best_block
188                 ) # new is child of current and previous is current
189                 or (
190                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(new_header)) == bitcoind_best_block and
191                     best_block_header.value['previous_block'] != bitcoind_best_block
192                 )): # new is current and previous is not a child of current
193                 best_block_header.set(new_header)
194         @defer.inlineCallbacks
195         def poll_header():
196             handle_header((yield factory.conn.value.get_block_header(bitcoind_work.value['previous_block'])))
197         bitcoind_work.changed.watch(lambda _: poll_header())
198         yield poll_header()
199         
200         # BEST SHARE
201         
202         get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: bitcoind_work.value['previous_block'], net)
203         requested = expiring_dict.ExpiringDict(300)
204         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
205         
206         best_share_var = variable.Variable(None)
207         def set_best_share():
208             best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'])
209             
210             best_share_var.set(best)
211             
212             t = time.time()
213             for peer2, share_hash in desired:
214                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
215                     continue
216                 last_request_time, count = requested.get(share_hash, (None, 0))
217                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
218                     continue
219                 potential_peers = set()
220                 for head in tracker.tails[share_hash]:
221                     potential_peers.update(peer_heads.get(head, set()))
222                 potential_peers = [peer for peer in potential_peers if peer.connected2]
223                 if count == 0 and peer2 is not None and peer2.connected2:
224                     peer = peer2
225                 else:
226                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
227                     if peer is None:
228                         continue
229                 
230                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
231                 peer.send_getshares(
232                     hashes=[share_hash],
233                     parents=2000,
234                     stops=list(set(tracker.heads) | set(
235                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
236                     ))[:100],
237                 )
238                 requested[share_hash] = t, count + 1
239         bitcoind_work.changed.watch(lambda _: set_best_share())
240         set_best_share()
241         
242         
243         print '    ...success!'
244         print
245         
246         # setup p2p logic and join p2pool network
247         
248         class Node(p2p.Node):
249             def handle_shares(self, shares, peer):
250                 if len(shares) > 5:
251                     print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
252                 
253                 new_count = 0
254                 for share in shares:
255                     if share.hash in tracker.shares:
256                         #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
257                         continue
258                     
259                     new_count += 1
260                     
261                     #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
262                     
263                     tracker.add(share)
264                 
265                 if shares and peer is not None:
266                     peer_heads.setdefault(shares[0].hash, set()).add(peer)
267                 
268                 if new_count:
269                     set_best_share()
270                 
271                 if len(shares) > 5:
272                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
273             
274             def handle_share_hashes(self, hashes, peer):
275                 t = time.time()
276                 get_hashes = []
277                 for share_hash in hashes:
278                     if share_hash in tracker.shares:
279                         continue
280                     last_request_time, count = requested.get(share_hash, (None, 0))
281                     if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
282                         continue
283                     print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
284                     get_hashes.append(share_hash)
285                     requested[share_hash] = t, count + 1
286                 
287                 if hashes and peer is not None:
288                     peer_heads.setdefault(hashes[0], set()).add(peer)
289                 if get_hashes:
290                     peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
291             
292             def handle_get_shares(self, hashes, parents, stops, peer):
293                 parents = min(parents, 1000//len(hashes))
294                 stops = set(stops)
295                 shares = []
296                 for share_hash in hashes:
297                     for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
298                         if share.hash in stops:
299                             break
300                         shares.append(share)
301                 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
302                 return shares
303             
304             def handle_bestblock(self, header, peer):
305                 if net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header)) > header['bits'].target:
306                     raise p2p.PeerMisbehavingError('received block header fails PoW test')
307                 handle_header(header)
308         
309         @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
310         def submit_block_p2p(block):
311             if factory.conn.value is None:
312                 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%32x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, bitcoin_data.hash256(bitcoin_data.block_header_type.pack(block['header'])))
313                 raise deferral.RetrySilentlyException()
314             factory.conn.value.send_block(block=block)
315         
316         @deferral.retry('Error submitting block: (will retry)', 10, 10)
317         @defer.inlineCallbacks
318         def submit_block_rpc(block, ignore_failure):
319             success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
320             success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
321             if (not success and success_expected and not ignore_failure) or (success and not success_expected):
322                 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (success, success_expected)
323         
324         def submit_block(block, ignore_failure):
325             submit_block_p2p(block)
326             submit_block_rpc(block, ignore_failure)
327         
328         @tracker.verified.added.watch
329         def _(share):
330             if share.pow_hash <= share.header['bits'].target:
331                 submit_block(share.as_block(tracker), ignore_failure=True)
332                 print
333                 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
334                 print
335                 def spread():
336                     if (get_height_rel_highest(share.header['previous_block']) > -5 or
337                         bitcoind_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]):
338                         broadcast_share(share.hash)
339                 spread()
340                 reactor.callLater(5, spread) # so get_height_rel_highest can update
341         
342         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
343         
344         @defer.inlineCallbacks
345         def parse(x):
346             if ':' in x:
347                 ip, port = x.split(':')
348                 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
349             else:
350                 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
351         
352         addrs = {}
353         if os.path.exists(os.path.join(datadir_path, 'addrs')):
354             try:
355                 with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
356                     addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
357             except:
358                 print >>sys.stderr, 'error parsing addrs'
359         for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
360             try:
361                 addr = yield addr_df
362                 if addr not in addrs:
363                     addrs[addr] = (0, time.time(), time.time())
364             except:
365                 log.err()
366         
367         connect_addrs = set()
368         for addr_df in map(parse, args.p2pool_nodes):
369             try:
370                 connect_addrs.add((yield addr_df))
371             except:
372                 log.err()
373         
374         p2p_node = Node(
375             best_share_hash_func=lambda: best_share_var.value,
376             port=args.p2pool_port,
377             net=net,
378             addr_store=addrs,
379             connect_addrs=connect_addrs,
380             max_incoming_conns=args.p2pool_conns,
381         )
382         p2p_node.start()
383         
384         def save_addrs():
385             with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
386                 f.write(json.dumps(p2p_node.addr_store.items()))
387         task.LoopingCall(save_addrs).start(60)
388         
389         @best_block_header.changed.watch
390         def _(header):
391             for peer in p2p_node.peers.itervalues():
392                 peer.send_bestblock(header=header)
393         
394         def broadcast_share(share_hash):
395             shares = []
396             for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
397                 if share.hash in shared_share_hashes:
398                     break
399                 shared_share_hashes.add(share.hash)
400                 shares.append(share)
401             
402             for peer in p2p_node.peers.itervalues():
403                 peer.sendShares([share for share in shares if share.peer is not peer])
404         
405         # send share when the chain changes to their chain
406         best_share_var.changed.watch(broadcast_share)
407         
408         def save_shares():
409             for share in tracker.get_chain(best_share_var.value, min(tracker.get_height(best_share_var.value), 2*net.CHAIN_LENGTH)):
410                 ss.add_share(share)
411                 if share.hash in tracker.verified.shares:
412                     ss.add_verified_hash(share.hash)
413         task.LoopingCall(save_shares).start(60)
414         
415         print '    ...success!'
416         print
417         
418         if args.upnp:
419             @defer.inlineCallbacks
420             def upnp_thread():
421                 while True:
422                     try:
423                         is_lan, lan_ip = yield ipdiscover.get_local_ip()
424                         if is_lan:
425                             pm = yield portmapper.get_port_mapper()
426                             yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
427                     except defer.TimeoutError:
428                         pass
429                     except:
430                         if p2pool.DEBUG:
431                             log.err(None, 'UPnP error:')
432                     yield deferral.sleep(random.expovariate(1/120))
433             upnp_thread()
434         
435         # start listening for workers with a JSON-RPC server
436         
437         print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
438         
439         get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, best_share_var.value, bitcoind_work.value['bits'].target, bitcoind_work.value['subsidy'], net)
440         
441         wb = work.WorkerBridge(my_pubkey_hash, net, args.donation_percentage, bitcoind_work, best_block_header, merged_urls, best_share_var, tracker, my_share_hashes, my_doa_share_hashes, args.worker_fee, p2p_node, submit_block, set_best_share, shared_share_hashes, block_height_var)
442         web_root = web.get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net, wb.get_stale_counts, my_pubkey_hash, wb.local_rate_monitor, args.worker_fee, p2p_node, wb.my_share_hashes, wb.pseudoshare_received, wb.share_received, best_share_var)
443         worker_interface.WorkerInterface(wb).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
444         
445         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
446         
447         with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
448             pass
449         
450         print '    ...success!'
451         print
452         
453         
454         # done!
455         print 'Started successfully!'
456         print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
457         if args.donation_percentage > 0.51:
458             print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
459         elif args.donation_percentage < 0.49:
460             print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
461         else:
462             print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
463             print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
464         print
465         
466         
467         if hasattr(signal, 'SIGALRM'):
468             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
469                 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
470             ))
471             signal.siginterrupt(signal.SIGALRM, False)
472             task.LoopingCall(signal.alarm, 30).start(1)
473         
474         if args.irc_announce:
475             from twisted.words.protocols import irc
476             class IRCClient(irc.IRCClient):
477                 nickname = 'p2pool%02i' % (random.randrange(100),)
478                 channel = net.ANNOUNCE_CHANNEL
479                 def lineReceived(self, line):
480                     if p2pool.DEBUG:
481                         print repr(line)
482                     irc.IRCClient.lineReceived(self, line)
483                 def signedOn(self):
484                     irc.IRCClient.signedOn(self)
485                     self.factory.resetDelay()
486                     self.join(self.channel)
487                     @defer.inlineCallbacks
488                     def new_share(share):
489                         if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
490                             yield deferral.sleep(random.expovariate(1/60))
491                             message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
492                             if message not in self.recent_messages:
493                                 self.say(self.channel, message)
494                                 self._remember_message(message)
495                     self.watch_id = tracker.verified.added.watch(new_share)
496                     self.recent_messages = []
497                 def _remember_message(self, message):
498                     self.recent_messages.append(message)
499                     while len(self.recent_messages) > 100:
500                         self.recent_messages.pop(0)
501                 def privmsg(self, user, channel, message):
502                     if channel == self.channel:
503                         self._remember_message(message)
504                 def connectionLost(self, reason):
505                     tracker.verified.added.unwatch(self.watch_id)
506                     print 'IRC connection lost:', reason.getErrorMessage()
507             class IRCClientFactory(protocol.ReconnectingClientFactory):
508                 protocol = IRCClient
509             reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
510         
511         @defer.inlineCallbacks
512         def status_thread():
513             last_str = None
514             last_time = 0
515             while True:
516                 yield deferral.sleep(3)
517                 try:
518                     if time.time() > bitcoind_work.value['last_update'] + 60:
519                         print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - bitcoind_work.value['last_update']),)
520                     
521                     height = tracker.get_height(best_share_var.value)
522                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
523                         height,
524                         len(tracker.verified.shares),
525                         len(tracker.shares),
526                         len(p2p_node.peers),
527                         sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
528                     ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
529                     
530                     datums, dt = wb.local_rate_monitor.get_datums_in_last()
531                     my_att_s = sum(datum['work']/dt for datum in datums)
532                     this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
533                         math.format(int(my_att_s)),
534                         math.format_dt(dt),
535                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
536                         math.format_dt(2**256 / tracker.shares[best_share_var.value].max_target / my_att_s) if my_att_s and best_share_var.value else '???',
537                     )
538                     
539                     if height > 2:
540                         (stale_orphan_shares, stale_doa_shares), shares, _ = wb.get_stale_counts()
541                         stale_prop = p2pool_data.get_average_stale_prop(tracker, best_share_var.value, min(60*60//net.SHARE_PERIOD, height))
542                         real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, best_share_var.value, min(height - 1, 60*60//net.SHARE_PERIOD)) / (1 - stale_prop)
543                         
544                         this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
545                             shares, stale_orphan_shares, stale_doa_shares,
546                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
547                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
548                             get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
549                         )
550                         this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
551                             math.format(int(real_att_s)),
552                             100*stale_prop,
553                             math.format_dt(2**256 / bitcoind_work.value['bits'].target / real_att_s),
554                         )
555                         
556                         for warning in p2pool_data.get_warnings(tracker, best_share_var.value, net):
557                             print >>sys.stderr, '#'*40
558                             print >>sys.stderr, '>>> Warning: ' + warning
559                             print >>sys.stderr, '#'*40
560                     
561                     if this_str != last_str or time.time() > last_time + 15:
562                         print this_str
563                         last_str = this_str
564                         last_time = time.time()
565                 except:
566                     log.err()
567         status_thread()
568     except:
569         reactor.stop()
570         log.err(None, 'Fatal error:')
571
572 def run():
573     realnets = dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
574     
575     parser = fixargparse.FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
576     parser.add_argument('--version', action='version', version=p2pool.__version__)
577     parser.add_argument('--net',
578         help='use specified network (default: bitcoin)',
579         action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
580     parser.add_argument('--testnet',
581         help='''use the network's testnet''',
582         action='store_const', const=True, default=False, dest='testnet')
583     parser.add_argument('--debug',
584         help='enable debugging mode',
585         action='store_const', const=True, default=False, dest='debug')
586     parser.add_argument('-a', '--address',
587         help='generate payouts to this address (default: <address requested from bitcoind>)',
588         type=str, action='store', default=None, dest='address')
589     parser.add_argument('--datadir',
590         help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
591         type=str, action='store', default=None, dest='datadir')
592     parser.add_argument('--logfile',
593         help='''log to this file (default: data/<NET>/log)''',
594         type=str, action='store', default=None, dest='logfile')
595     parser.add_argument('--merged',
596         help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
597         type=str, action='append', default=[], dest='merged_urls')
598     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
599         help='donate this percentage of work towards the development of p2pool (default: 0.5)',
600         type=float, action='store', default=0.5, dest='donation_percentage')
601     parser.add_argument('--iocp',
602         help='use Windows IOCP API in order to avoid errors due to large number of sockets being open',
603         action='store_true', default=False, dest='iocp')
604     parser.add_argument('--irc-announce',
605         help='announce any blocks found on irc://irc.freenode.net/#p2pool',
606         action='store_true', default=False, dest='irc_announce')
607     parser.add_argument('--no-bugreport',
608         help='disable submitting caught exceptions to the author',
609         action='store_true', default=False, dest='no_bugreport')
610     
611     p2pool_group = parser.add_argument_group('p2pool interface')
612     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
613         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
614         type=int, action='store', default=None, dest='p2pool_port')
615     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
616         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
617         type=str, action='append', default=[], dest='p2pool_nodes')
618     parser.add_argument('--disable-upnp',
619         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
620         action='store_false', default=True, dest='upnp')
621     p2pool_group.add_argument('--max-conns', metavar='CONNS',
622         help='maximum incoming connections (default: 40)',
623         type=int, action='store', default=40, dest='p2pool_conns')
624     
625     worker_group = parser.add_argument_group('worker interface')
626     worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
627         help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
628         type=str, action='store', default=None, dest='worker_endpoint')
629     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
630         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
631         type=float, action='store', default=0, dest='worker_fee')
632     
633     bitcoind_group = parser.add_argument_group('bitcoind interface')
634     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
635         help='connect to this address (default: 127.0.0.1)',
636         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
637     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
638         help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
639         type=int, action='store', default=None, dest='bitcoind_rpc_port')
640     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
641         help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
642         type=int, action='store', default=None, dest='bitcoind_p2p_port')
643     
644     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
645         help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
646         type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
647     
648     args = parser.parse_args()
649     
650     if args.debug:
651         p2pool.DEBUG = True
652     
653     net_name = args.net_name + ('_testnet' if args.testnet else '')
654     net = networks.nets[net_name]
655     
656     datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
657     if not os.path.exists(datadir_path):
658         os.makedirs(datadir_path)
659     
660     if len(args.bitcoind_rpc_userpass) > 2:
661         parser.error('a maximum of two arguments are allowed')
662     args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
663     
664     if args.bitcoind_rpc_password is None:
665         if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
666             parser.error('This network has no configuration file function. Manually enter your RPC password.')
667         conf_path = net.PARENT.CONF_FILE_FUNC()
668         if not os.path.exists(conf_path):
669             parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
670                 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
671                 '''\r\n'''
672                 '''server=1\r\n'''
673                 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
674         with open(conf_path, 'rb') as f:
675             cp = ConfigParser.RawConfigParser()
676             cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
677             for conf_name, var_name, var_type in [
678                 ('rpcuser', 'bitcoind_rpc_username', str),
679                 ('rpcpassword', 'bitcoind_rpc_password', str),
680                 ('rpcport', 'bitcoind_rpc_port', int),
681                 ('port', 'bitcoind_p2p_port', int),
682             ]:
683                 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
684                     setattr(args, var_name, var_type(cp.get('x', conf_name)))
685         if args.bitcoind_rpc_password is None:
686             parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
687     
688     if args.bitcoind_rpc_username is None:
689         args.bitcoind_rpc_username = ''
690     
691     if args.bitcoind_rpc_port is None:
692         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
693     
694     if args.bitcoind_p2p_port is None:
695         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
696     
697     if args.p2pool_port is None:
698         args.p2pool_port = net.P2P_PORT
699     
700     if args.worker_endpoint is None:
701         worker_endpoint = '', net.WORKER_PORT
702     elif ':' not in args.worker_endpoint:
703         worker_endpoint = '', int(args.worker_endpoint)
704     else:
705         addr, port = args.worker_endpoint.rsplit(':', 1)
706         worker_endpoint = addr, int(port)
707     
708     if args.address is not None:
709         try:
710             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
711         except Exception, e:
712             parser.error('error parsing address: ' + repr(e))
713     else:
714         args.pubkey_hash = None
715     
716     def separate_url(url):
717         s = urlparse.urlsplit(url)
718         if '@' not in s.netloc:
719             parser.error('merged url netloc must contain an "@"')
720         userpass, new_netloc = s.netloc.rsplit('@', 1)
721         return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
722     merged_urls = map(separate_url, args.merged_urls)
723     
724     if args.logfile is None:
725         args.logfile = os.path.join(datadir_path, 'log')
726     
727     logfile = logging.LogFile(args.logfile)
728     pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
729     sys.stdout = logging.AbortPipe(pipe)
730     sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
731     if hasattr(signal, "SIGUSR1"):
732         def sigusr1(signum, frame):
733             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
734             logfile.reopen()
735             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
736         signal.signal(signal.SIGUSR1, sigusr1)
737     task.LoopingCall(logfile.reopen).start(5)
738     
739     class ErrorReporter(object):
740         def __init__(self):
741             self.last_sent = None
742         
743         def emit(self, eventDict):
744             if not eventDict["isError"]:
745                 return
746             
747             if self.last_sent is not None and time.time() < self.last_sent + 5:
748                 return
749             self.last_sent = time.time()
750             
751             if 'failure' in eventDict:
752                 text = ((eventDict.get('why') or 'Unhandled Error')
753                     + '\n' + eventDict['failure'].getTraceback())
754             else:
755                 text = " ".join([str(m) for m in eventDict["message"]]) + "\n"
756             
757             from twisted.web import client
758             client.getPage(
759                 url='http://u.forre.st/p2pool_error.cgi',
760                 method='POST',
761                 postdata=p2pool.__version__ + ' ' + net.NAME + '\n' + text,
762                 timeout=15,
763             ).addBoth(lambda x: None)
764     if not args.no_bugreport:
765         log.addObserver(ErrorReporter().emit)
766     
767     reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)
768     reactor.run()