broadcast shares in serial
[p2pool.git] / p2pool / main.py
1 from __future__ import division
2
3 import ConfigParser
4 import StringIO
5 import base64
6 import json
7 import os
8 import random
9 import sys
10 import time
11 import signal
12 import traceback
13 import urlparse
14
15 if '--iocp' in sys.argv:
16     from twisted.internet import iocpreactor
17     iocpreactor.install()
18 from twisted.internet import defer, reactor, protocol, task
19 from twisted.web import server
20 from twisted.python import log
21 from nattraverso import portmapper, ipdiscover
22
23 import bitcoin.p2p as bitcoin_p2p, bitcoin.data as bitcoin_data
24 from bitcoin import worker_interface, height_tracker
25 from util import expiring_dict, fixargparse, jsonrpc, variable, deferral, math, logging
26 from . import p2p, networks, web, work
27 import p2pool, p2pool.data as p2pool_data
28
29 @deferral.retry('Error getting work from bitcoind:', 3)
30 @defer.inlineCallbacks
31 def getwork(bitcoind):
32     try:
33         work = yield bitcoind.rpc_getmemorypool()
34     except jsonrpc.Error, e:
35         if e.code == -32601: # Method not found
36             print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
37             raise deferral.RetrySilentlyException()
38         raise
39     packed_transactions = [x.decode('hex') for x in work['transactions']]
40     defer.returnValue(dict(
41         version=work['version'],
42         previous_block=int(work['previousblockhash'], 16),
43         transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
44         merkle_link=bitcoin_data.calculate_merkle_link([None] + map(bitcoin_data.hash256, packed_transactions), 0),
45         subsidy=work['coinbasevalue'],
46         time=work['time'],
47         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
48         coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
49         clock_offset=time.time() - work['time'],
50         last_update=time.time(),
51     ))
52
53 @defer.inlineCallbacks
54 def main(args, net, datadir_path, merged_urls, worker_endpoint):
55     try:
56         print 'p2pool (version %s)' % (p2pool.__version__,)
57         print
58         
59         # connect to bitcoind over JSON-RPC and do initial getmemorypool
60         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
61         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
62         bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
63         @deferral.retry('Error while checking Bitcoin connection:', 1)
64         @defer.inlineCallbacks
65         def check():
66             if not (yield net.PARENT.RPC_CHECK(bitcoind)):
67                 print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
68                 raise deferral.RetrySilentlyException()
69             temp_work = yield getwork(bitcoind)
70             if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version'], temp_work):
71                 print >>sys.stderr, '    Bitcoin version too old! BIP16 support required! Upgrade to 0.6.0rc4 or greater!'
72                 raise deferral.RetrySilentlyException()
73             defer.returnValue(temp_work)
74         temp_work = yield check()
75         
76         block_height_var = variable.Variable(None)
77         @defer.inlineCallbacks
78         def poll_height():
79             block_height_var.set((yield deferral.retry('Error while calling getblockcount:')(bitcoind.rpc_getblockcount)()))
80         yield poll_height()
81         task.LoopingCall(poll_height).start(60*60)
82         
83         print '    ...success!'
84         print '    Current block hash: %x' % (temp_work['previous_block'],)
85         print '    Current block height: %i' % (block_height_var.value,)
86         print
87         
88         # connect to bitcoind over bitcoin-p2p
89         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
90         factory = bitcoin_p2p.ClientFactory(net.PARENT)
91         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
92         yield factory.getProtocol() # waits until handshake is successful
93         print '    ...success!'
94         print
95         
96         print 'Determining payout address...'
97         if args.pubkey_hash is None:
98             address_path = os.path.join(datadir_path, 'cached_payout_address')
99             
100             if os.path.exists(address_path):
101                 with open(address_path, 'rb') as f:
102                     address = f.read().strip('\r\n')
103                 print '    Loaded cached address: %s...' % (address,)
104             else:
105                 address = None
106             
107             if address is not None:
108                 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
109                 if not res['isvalid'] or not res['ismine']:
110                     print '    Cached address is either invalid or not controlled by local bitcoind!'
111                     address = None
112             
113             if address is None:
114                 print '    Getting payout address from bitcoind...'
115                 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
116             
117             with open(address_path, 'wb') as f:
118                 f.write(address)
119             
120             my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
121         else:
122             my_pubkey_hash = args.pubkey_hash
123         print '    ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
124         print
125         
126         my_share_hashes = set()
127         my_doa_share_hashes = set()
128         
129         tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
130         shared_share_hashes = set()
131         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
132         known_verified = set()
133         print "Loading shares..."
134         for i, (mode, contents) in enumerate(ss.get_shares()):
135             if mode == 'share':
136                 if contents.hash in tracker.items:
137                     continue
138                 shared_share_hashes.add(contents.hash)
139                 contents.time_seen = 0
140                 tracker.add(contents)
141                 if len(tracker.items) % 1000 == 0 and tracker.items:
142                     print "    %i" % (len(tracker.items),)
143             elif mode == 'verified_hash':
144                 known_verified.add(contents)
145             else:
146                 raise AssertionError()
147         print "    ...inserting %i verified shares..." % (len(known_verified),)
148         for h in known_verified:
149             if h not in tracker.items:
150                 ss.forget_verified_share(h)
151                 continue
152             tracker.verified.add(tracker.items[h])
153         print "    ...done loading %i shares!" % (len(tracker.items),)
154         print
155         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
156         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
157         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
158         
159         print 'Initializing work...'
160         
161         
162         # BITCOIND WORK
163         
164         bitcoind_work = variable.Variable((yield getwork(bitcoind)))
165         @defer.inlineCallbacks
166         def work_poller():
167             while True:
168                 flag = factory.new_block.get_deferred()
169                 try:
170                     bitcoind_work.set((yield getwork(bitcoind)))
171                 except:
172                     log.err()
173                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
174         work_poller()
175         
176         # PEER WORK
177         
178         best_block_header = variable.Variable(None)
179         def handle_header(new_header):
180             # check that header matches current target
181             if not (net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(new_header)) <= bitcoind_work.value['bits'].target):
182                 return
183             bitcoind_best_block = bitcoind_work.value['previous_block']
184             if (best_block_header.value is None
185                 or (
186                     new_header['previous_block'] == bitcoind_best_block and
187                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)) == bitcoind_best_block
188                 ) # new is child of current and previous is current
189                 or (
190                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(new_header)) == bitcoind_best_block and
191                     best_block_header.value['previous_block'] != bitcoind_best_block
192                 )): # new is current and previous is not a child of current
193                 best_block_header.set(new_header)
194         @defer.inlineCallbacks
195         def poll_header():
196             handle_header((yield factory.conn.value.get_block_header(bitcoind_work.value['previous_block'])))
197         bitcoind_work.changed.watch(lambda _: poll_header())
198         yield poll_header()
199         
200         # BEST SHARE
201         
202         get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: bitcoind_work.value['previous_block'], net)
203         requested = expiring_dict.ExpiringDict(300)
204         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
205         
206         best_share_var = variable.Variable(None)
207         def set_best_share():
208             best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'])
209             
210             best_share_var.set(best)
211             
212             t = time.time()
213             for peer2, share_hash in desired:
214                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
215                     continue
216                 last_request_time, count = requested.get(share_hash, (None, 0))
217                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
218                     continue
219                 potential_peers = set()
220                 for head in tracker.tails[share_hash]:
221                     potential_peers.update(peer_heads.get(head, set()))
222                 potential_peers = [peer for peer in potential_peers if peer.connected2]
223                 if count == 0 and peer2 is not None and peer2.connected2:
224                     peer = peer2
225                 else:
226                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
227                     if peer is None:
228                         continue
229                 
230                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
231                 peer.send_getshares(
232                     hashes=[share_hash],
233                     parents=2000,
234                     stops=list(set(tracker.heads) | set(
235                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
236                     ))[:100],
237                 )
238                 requested[share_hash] = t, count + 1
239         bitcoind_work.changed.watch(lambda _: set_best_share())
240         set_best_share()
241         
242         
243         print '    ...success!'
244         print
245         
246         # setup p2p logic and join p2pool network
247         
248         class Node(p2p.Node):
249             def handle_shares(self, shares, peer):
250                 if len(shares) > 5:
251                     print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
252                 
253                 new_count = 0
254                 for share in shares:
255                     if share.hash in tracker.items:
256                         #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
257                         continue
258                     
259                     new_count += 1
260                     
261                     #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
262                     
263                     tracker.add(share)
264                 
265                 if shares and peer is not None:
266                     peer_heads.setdefault(shares[0].hash, set()).add(peer)
267                 
268                 if new_count:
269                     set_best_share()
270                 
271                 if len(shares) > 5:
272                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.items), 2*net.CHAIN_LENGTH)
273             
274             def handle_share_hashes(self, hashes, peer):
275                 t = time.time()
276                 get_hashes = []
277                 for share_hash in hashes:
278                     if share_hash in tracker.items:
279                         continue
280                     last_request_time, count = requested.get(share_hash, (None, 0))
281                     if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
282                         continue
283                     print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
284                     get_hashes.append(share_hash)
285                     requested[share_hash] = t, count + 1
286                 
287                 if hashes and peer is not None:
288                     peer_heads.setdefault(hashes[0], set()).add(peer)
289                 if get_hashes:
290                     peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
291             
292             def handle_get_shares(self, hashes, parents, stops, peer):
293                 parents = min(parents, 1000//len(hashes))
294                 stops = set(stops)
295                 shares = []
296                 for share_hash in hashes:
297                     for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
298                         if share.hash in stops:
299                             break
300                         shares.append(share)
301                 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
302                 return shares
303             
304             def handle_bestblock(self, header, peer):
305                 if net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header)) > header['bits'].target:
306                     raise p2p.PeerMisbehavingError('received block header fails PoW test')
307                 handle_header(header)
308         
309         @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
310         def submit_block_p2p(block):
311             if factory.conn.value is None:
312                 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%32x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, bitcoin_data.hash256(bitcoin_data.block_header_type.pack(block['header'])))
313                 raise deferral.RetrySilentlyException()
314             factory.conn.value.send_block(block=block)
315         
316         @deferral.retry('Error submitting block: (will retry)', 10, 10)
317         @defer.inlineCallbacks
318         def submit_block_rpc(block, ignore_failure):
319             success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
320             success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
321             if (not success and success_expected and not ignore_failure) or (success and not success_expected):
322                 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (success, success_expected)
323         
324         def submit_block(block, ignore_failure):
325             submit_block_p2p(block)
326             submit_block_rpc(block, ignore_failure)
327         
328         @tracker.verified.added.watch
329         def _(share):
330             if share.pow_hash <= share.header['bits'].target:
331                 submit_block(share.as_block(tracker), ignore_failure=True)
332                 print
333                 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
334                 print
335                 def spread():
336                     if (get_height_rel_highest(share.header['previous_block']) > -5 or
337                         bitcoind_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]):
338                         broadcast_share(share.hash)
339                 spread()
340                 reactor.callLater(5, spread) # so get_height_rel_highest can update
341         
342         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
343         
344         @defer.inlineCallbacks
345         def parse(x):
346             if ':' in x:
347                 ip, port = x.split(':')
348                 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
349             else:
350                 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
351         
352         addrs = {}
353         if os.path.exists(os.path.join(datadir_path, 'addrs')):
354             try:
355                 with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
356                     addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
357             except:
358                 print >>sys.stderr, 'error parsing addrs'
359         for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
360             try:
361                 addr = yield addr_df
362                 if addr not in addrs:
363                     addrs[addr] = (0, time.time(), time.time())
364             except:
365                 log.err()
366         
367         connect_addrs = set()
368         for addr_df in map(parse, args.p2pool_nodes):
369             try:
370                 connect_addrs.add((yield addr_df))
371             except:
372                 log.err()
373         
374         p2p_node = Node(
375             best_share_hash_func=lambda: best_share_var.value,
376             port=args.p2pool_port,
377             net=net,
378             addr_store=addrs,
379             connect_addrs=connect_addrs,
380             max_incoming_conns=args.p2pool_conns,
381         )
382         p2p_node.start()
383         
384         def save_addrs():
385             with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
386                 f.write(json.dumps(p2p_node.addr_store.items()))
387         task.LoopingCall(save_addrs).start(60)
388         
389         @best_block_header.changed.watch
390         def _(header):
391             for peer in p2p_node.peers.itervalues():
392                 peer.send_bestblock(header=header)
393         
394         @defer.inlineCallbacks
395         def broadcast_share(share_hash):
396             shares = []
397             for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
398                 if share.hash in shared_share_hashes:
399                     break
400                 shared_share_hashes.add(share.hash)
401                 shares.append(share)
402             
403             for peer in list(p2p_node.peers.itervalues()):
404                 yield peer.sendShares([share for share in shares if share.peer is not peer])
405         
406         # send share when the chain changes to their chain
407         best_share_var.changed.watch(broadcast_share)
408         
409         def save_shares():
410             for share in tracker.get_chain(best_share_var.value, min(tracker.get_height(best_share_var.value), 2*net.CHAIN_LENGTH)):
411                 ss.add_share(share)
412                 if share.hash in tracker.verified.items:
413                     ss.add_verified_hash(share.hash)
414         task.LoopingCall(save_shares).start(60)
415         
416         print '    ...success!'
417         print
418         
419         if args.upnp:
420             @defer.inlineCallbacks
421             def upnp_thread():
422                 while True:
423                     try:
424                         is_lan, lan_ip = yield ipdiscover.get_local_ip()
425                         if is_lan:
426                             pm = yield portmapper.get_port_mapper()
427                             yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
428                     except defer.TimeoutError:
429                         pass
430                     except:
431                         if p2pool.DEBUG:
432                             log.err(None, 'UPnP error:')
433                     yield deferral.sleep(random.expovariate(1/120))
434             upnp_thread()
435         
436         # start listening for workers with a JSON-RPC server
437         
438         print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
439         
440         get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, best_share_var.value, bitcoind_work.value['bits'].target, bitcoind_work.value['subsidy'], net)
441         
442         wb = work.WorkerBridge(my_pubkey_hash, net, args.donation_percentage, bitcoind_work, best_block_header, merged_urls, best_share_var, tracker, my_share_hashes, my_doa_share_hashes, args.worker_fee, p2p_node, submit_block, set_best_share, broadcast_share, block_height_var)
443         web_root = web.get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net, wb.get_stale_counts, my_pubkey_hash, wb.local_rate_monitor, args.worker_fee, p2p_node, wb.my_share_hashes, wb.pseudoshare_received, wb.share_received, best_share_var)
444         worker_interface.WorkerInterface(wb).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
445         
446         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
447         
448         with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
449             pass
450         
451         print '    ...success!'
452         print
453         
454         
455         # done!
456         print 'Started successfully!'
457         print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
458         if args.donation_percentage > 0.51:
459             print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
460         elif args.donation_percentage < 0.49:
461             print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
462         else:
463             print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
464             print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
465         print
466         
467         
468         if hasattr(signal, 'SIGALRM'):
469             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
470                 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
471             ))
472             signal.siginterrupt(signal.SIGALRM, False)
473             task.LoopingCall(signal.alarm, 30).start(1)
474         
475         if args.irc_announce:
476             from twisted.words.protocols import irc
477             class IRCClient(irc.IRCClient):
478                 nickname = 'p2pool%02i' % (random.randrange(100),)
479                 channel = net.ANNOUNCE_CHANNEL
480                 def lineReceived(self, line):
481                     if p2pool.DEBUG:
482                         print repr(line)
483                     irc.IRCClient.lineReceived(self, line)
484                 def signedOn(self):
485                     irc.IRCClient.signedOn(self)
486                     self.factory.resetDelay()
487                     self.join(self.channel)
488                     @defer.inlineCallbacks
489                     def new_share(share):
490                         if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
491                             yield deferral.sleep(random.expovariate(1/60))
492                             message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
493                             if message not in self.recent_messages:
494                                 self.say(self.channel, message)
495                                 self._remember_message(message)
496                     self.watch_id = tracker.verified.added.watch(new_share)
497                     self.recent_messages = []
498                 def _remember_message(self, message):
499                     self.recent_messages.append(message)
500                     while len(self.recent_messages) > 100:
501                         self.recent_messages.pop(0)
502                 def privmsg(self, user, channel, message):
503                     if channel == self.channel:
504                         self._remember_message(message)
505                 def connectionLost(self, reason):
506                     tracker.verified.added.unwatch(self.watch_id)
507                     print 'IRC connection lost:', reason.getErrorMessage()
508             class IRCClientFactory(protocol.ReconnectingClientFactory):
509                 protocol = IRCClient
510             reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
511         
512         @defer.inlineCallbacks
513         def status_thread():
514             last_str = None
515             last_time = 0
516             while True:
517                 yield deferral.sleep(3)
518                 try:
519                     if time.time() > bitcoind_work.value['last_update'] + 60:
520                         print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for %s! Check that it isn't frozen or dead! <---''' % (math.format_dt(time.time() - bitcoind_work.value['last_update']),)
521                     
522                     height = tracker.get_height(best_share_var.value)
523                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
524                         height,
525                         len(tracker.verified.items),
526                         len(tracker.items),
527                         len(p2p_node.peers),
528                         sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
529                     ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
530                     
531                     datums, dt = wb.local_rate_monitor.get_datums_in_last()
532                     my_att_s = sum(datum['work']/dt for datum in datums)
533                     this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
534                         math.format(int(my_att_s)),
535                         math.format_dt(dt),
536                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
537                         math.format_dt(2**256 / tracker.items[best_share_var.value].max_target / my_att_s) if my_att_s and best_share_var.value else '???',
538                     )
539                     
540                     if height > 2:
541                         (stale_orphan_shares, stale_doa_shares), shares, _ = wb.get_stale_counts()
542                         stale_prop = p2pool_data.get_average_stale_prop(tracker, best_share_var.value, min(60*60//net.SHARE_PERIOD, height))
543                         real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, best_share_var.value, min(height - 1, 60*60//net.SHARE_PERIOD)) / (1 - stale_prop)
544                         
545                         this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
546                             shares, stale_orphan_shares, stale_doa_shares,
547                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
548                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
549                             get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
550                         )
551                         this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
552                             math.format(int(real_att_s)),
553                             100*stale_prop,
554                             math.format_dt(2**256 / bitcoind_work.value['bits'].target / real_att_s),
555                         )
556                         
557                         for warning in p2pool_data.get_warnings(tracker, best_share_var.value, net):
558                             print >>sys.stderr, '#'*40
559                             print >>sys.stderr, '>>> Warning: ' + warning
560                             print >>sys.stderr, '#'*40
561                     
562                     if this_str != last_str or time.time() > last_time + 15:
563                         print this_str
564                         last_str = this_str
565                         last_time = time.time()
566                 except:
567                     log.err()
568         status_thread()
569     except:
570         reactor.stop()
571         log.err(None, 'Fatal error:')
572
573 def run():
574     realnets = dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
575     
576     parser = fixargparse.FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
577     parser.add_argument('--version', action='version', version=p2pool.__version__)
578     parser.add_argument('--net',
579         help='use specified network (default: bitcoin)',
580         action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
581     parser.add_argument('--testnet',
582         help='''use the network's testnet''',
583         action='store_const', const=True, default=False, dest='testnet')
584     parser.add_argument('--debug',
585         help='enable debugging mode',
586         action='store_const', const=True, default=False, dest='debug')
587     parser.add_argument('-a', '--address',
588         help='generate payouts to this address (default: <address requested from bitcoind>)',
589         type=str, action='store', default=None, dest='address')
590     parser.add_argument('--datadir',
591         help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
592         type=str, action='store', default=None, dest='datadir')
593     parser.add_argument('--logfile',
594         help='''log to this file (default: data/<NET>/log)''',
595         type=str, action='store', default=None, dest='logfile')
596     parser.add_argument('--merged',
597         help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
598         type=str, action='append', default=[], dest='merged_urls')
599     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
600         help='donate this percentage of work towards the development of p2pool (default: 0.5)',
601         type=float, action='store', default=0.5, dest='donation_percentage')
602     parser.add_argument('--iocp',
603         help='use Windows IOCP API in order to avoid errors due to large number of sockets being open',
604         action='store_true', default=False, dest='iocp')
605     parser.add_argument('--irc-announce',
606         help='announce any blocks found on irc://irc.freenode.net/#p2pool',
607         action='store_true', default=False, dest='irc_announce')
608     parser.add_argument('--no-bugreport',
609         help='disable submitting caught exceptions to the author',
610         action='store_true', default=False, dest='no_bugreport')
611     
612     p2pool_group = parser.add_argument_group('p2pool interface')
613     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
614         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
615         type=int, action='store', default=None, dest='p2pool_port')
616     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
617         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
618         type=str, action='append', default=[], dest='p2pool_nodes')
619     parser.add_argument('--disable-upnp',
620         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
621         action='store_false', default=True, dest='upnp')
622     p2pool_group.add_argument('--max-conns', metavar='CONNS',
623         help='maximum incoming connections (default: 40)',
624         type=int, action='store', default=40, dest='p2pool_conns')
625     
626     worker_group = parser.add_argument_group('worker interface')
627     worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
628         help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
629         type=str, action='store', default=None, dest='worker_endpoint')
630     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
631         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
632         type=float, action='store', default=0, dest='worker_fee')
633     
634     bitcoind_group = parser.add_argument_group('bitcoind interface')
635     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
636         help='connect to this address (default: 127.0.0.1)',
637         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
638     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
639         help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
640         type=int, action='store', default=None, dest='bitcoind_rpc_port')
641     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
642         help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
643         type=int, action='store', default=None, dest='bitcoind_p2p_port')
644     
645     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
646         help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
647         type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
648     
649     args = parser.parse_args()
650     
651     if args.debug:
652         p2pool.DEBUG = True
653         defer.setDebugging(True)
654     
655     net_name = args.net_name + ('_testnet' if args.testnet else '')
656     net = networks.nets[net_name]
657     
658     datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
659     if not os.path.exists(datadir_path):
660         os.makedirs(datadir_path)
661     
662     if len(args.bitcoind_rpc_userpass) > 2:
663         parser.error('a maximum of two arguments are allowed')
664     args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
665     
666     if args.bitcoind_rpc_password is None:
667         if not hasattr(net.PARENT, 'CONF_FILE_FUNC'):
668             parser.error('This network has no configuration file function. Manually enter your RPC password.')
669         conf_path = net.PARENT.CONF_FILE_FUNC()
670         if not os.path.exists(conf_path):
671             parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
672                 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
673                 '''\r\n'''
674                 '''server=1\r\n'''
675                 '''rpcpassword=%x''' % (conf_path, random.randrange(2**128)))
676         with open(conf_path, 'rb') as f:
677             cp = ConfigParser.RawConfigParser()
678             cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
679             for conf_name, var_name, var_type in [
680                 ('rpcuser', 'bitcoind_rpc_username', str),
681                 ('rpcpassword', 'bitcoind_rpc_password', str),
682                 ('rpcport', 'bitcoind_rpc_port', int),
683                 ('port', 'bitcoind_p2p_port', int),
684             ]:
685                 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
686                     setattr(args, var_name, var_type(cp.get('x', conf_name)))
687         if args.bitcoind_rpc_password is None:
688             parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
689     
690     if args.bitcoind_rpc_username is None:
691         args.bitcoind_rpc_username = ''
692     
693     if args.bitcoind_rpc_port is None:
694         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
695     
696     if args.bitcoind_p2p_port is None:
697         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
698     
699     if args.p2pool_port is None:
700         args.p2pool_port = net.P2P_PORT
701     
702     if args.worker_endpoint is None:
703         worker_endpoint = '', net.WORKER_PORT
704     elif ':' not in args.worker_endpoint:
705         worker_endpoint = '', int(args.worker_endpoint)
706     else:
707         addr, port = args.worker_endpoint.rsplit(':', 1)
708         worker_endpoint = addr, int(port)
709     
710     if args.address is not None:
711         try:
712             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
713         except Exception, e:
714             parser.error('error parsing address: ' + repr(e))
715     else:
716         args.pubkey_hash = None
717     
718     def separate_url(url):
719         s = urlparse.urlsplit(url)
720         if '@' not in s.netloc:
721             parser.error('merged url netloc must contain an "@"')
722         userpass, new_netloc = s.netloc.rsplit('@', 1)
723         return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
724     merged_urls = map(separate_url, args.merged_urls)
725     
726     if args.logfile is None:
727         args.logfile = os.path.join(datadir_path, 'log')
728     
729     logfile = logging.LogFile(args.logfile)
730     pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
731     sys.stdout = logging.AbortPipe(pipe)
732     sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
733     if hasattr(signal, "SIGUSR1"):
734         def sigusr1(signum, frame):
735             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
736             logfile.reopen()
737             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
738         signal.signal(signal.SIGUSR1, sigusr1)
739     task.LoopingCall(logfile.reopen).start(5)
740     
741     class ErrorReporter(object):
742         def __init__(self):
743             self.last_sent = None
744         
745         def emit(self, eventDict):
746             if not eventDict["isError"]:
747                 return
748             
749             if self.last_sent is not None and time.time() < self.last_sent + 5:
750                 return
751             self.last_sent = time.time()
752             
753             if 'failure' in eventDict:
754                 text = ((eventDict.get('why') or 'Unhandled Error')
755                     + '\n' + eventDict['failure'].getTraceback())
756             else:
757                 text = " ".join([str(m) for m in eventDict["message"]]) + "\n"
758             
759             from twisted.web import client
760             client.getPage(
761                 url='http://u.forre.st/p2pool_error.cgi',
762                 method='POST',
763                 postdata=p2pool.__version__ + ' ' + net.NAME + '\n' + text,
764                 timeout=15,
765             ).addBoth(lambda x: None)
766     if not args.no_bugreport:
767         log.addObserver(ErrorReporter().emit)
768     
769     reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)
770     reactor.run()