cleaned up share creation
[p2pool.git] / p2pool / main.py
1 from __future__ import division
2
3 import base64
4 import json
5 import os
6 import random
7 import sys
8 import time
9 import signal
10 import traceback
11 import urlparse
12
13 if '--iocp' in sys.argv:
14     from twisted.internet import iocpreactor
15     iocpreactor.install()
16 from twisted.internet import defer, reactor, protocol, task
17 from twisted.web import server
18 from twisted.python import log
19 from nattraverso import portmapper, ipdiscover
20
21 import bitcoin.p2p as bitcoin_p2p, bitcoin.data as bitcoin_data
22 from bitcoin import worker_interface, height_tracker
23 from util import fixargparse, jsonrpc, variable, deferral, math, logging
24 from . import p2p, networks, web, work
25 import p2pool, p2pool.data as p2pool_data
26
27 @deferral.retry('Error getting work from bitcoind:', 3)
28 @defer.inlineCallbacks
29 def getwork(bitcoind, use_getblocktemplate=False):
30     def go():
31         if use_getblocktemplate:
32             return bitcoind.rpc_getblocktemplate(dict(mode='template'))
33         else:
34             return bitcoind.rpc_getmemorypool()
35     try:
36         work = yield go()
37     except jsonrpc.Error_for_code(-32601): # Method not found
38         use_getblocktemplate = not use_getblocktemplate
39         try:
40             work = yield go()
41         except jsonrpc.Error_for_code(-32601): # Method not found
42             print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
43             raise deferral.RetrySilentlyException()
44     packed_transactions = [(x['data'] if isinstance(x, dict) else x).decode('hex') for x in work['transactions']]
45     if 'height' not in work:
46         work['height'] = (yield bitcoind.rpc_getblock(work['previousblockhash']))['height'] + 1
47     elif p2pool.DEBUG:
48         assert work['height'] == (yield bitcoind.rpc_getblock(work['previousblockhash']))['height'] + 1
49     defer.returnValue(dict(
50         version=work['version'],
51         previous_block=int(work['previousblockhash'], 16),
52         transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
53         subsidy=work['coinbasevalue'],
54         time=work['time'] if 'time' in work else work['curtime'],
55         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
56         coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
57         height=work['height'],
58         last_update=time.time(),
59         use_getblocktemplate=use_getblocktemplate,
60     ))
61
62 @defer.inlineCallbacks
63 def main(args, net, datadir_path, merged_urls, worker_endpoint):
64     try:
65         print 'p2pool (version %s)' % (p2pool.__version__,)
66         print
67         
68         traffic_happened = variable.Event()
69         
70         # connect to bitcoind over JSON-RPC and do initial getmemorypool
71         url = '%s://%s:%i/' % ('https' if args.bitcoind_rpc_ssl else 'http', args.bitcoind_address, args.bitcoind_rpc_port)
72         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
73         bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
74         @deferral.retry('Error while checking Bitcoin connection:', 1)
75         @defer.inlineCallbacks
76         def check():
77             if not (yield net.PARENT.RPC_CHECK(bitcoind)):
78                 print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
79                 raise deferral.RetrySilentlyException()
80             if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version']):
81                 print >>sys.stderr, '    Bitcoin version too old! Upgrade to 0.6.4 or newer!'
82                 raise deferral.RetrySilentlyException()
83         yield check()
84         temp_work = yield getwork(bitcoind)
85         
86         block_height_var = variable.Variable(None)
87         @defer.inlineCallbacks
88         def poll_height():
89             block_height_var.set((yield deferral.retry('Error while calling getblockcount:')(bitcoind.rpc_getblockcount)()))
90         yield poll_height()
91         task.LoopingCall(poll_height).start(60*60)
92         
93         bitcoind_warning_var = variable.Variable(None)
94         @defer.inlineCallbacks
95         def poll_warnings():
96             errors = (yield deferral.retry('Error while calling getmininginfo:')(bitcoind.rpc_getmininginfo)())['errors']
97             bitcoind_warning_var.set(errors if errors != '' else None)
98         yield poll_warnings()
99         task.LoopingCall(poll_warnings).start(20*60)
100         
101         print '    ...success!'
102         print '    Current block hash: %x' % (temp_work['previous_block'],)
103         print '    Current block height: %i' % (block_height_var.value,)
104         print
105         
106         # connect to bitcoind over bitcoin-p2p
107         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
108         factory = bitcoin_p2p.ClientFactory(net.PARENT)
109         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
110         yield factory.getProtocol() # waits until handshake is successful
111         print '    ...success!'
112         print
113         
114         print 'Determining payout address...'
115         if args.pubkey_hash is None:
116             address_path = os.path.join(datadir_path, 'cached_payout_address')
117             
118             if os.path.exists(address_path):
119                 with open(address_path, 'rb') as f:
120                     address = f.read().strip('\r\n')
121                 print '    Loaded cached address: %s...' % (address,)
122             else:
123                 address = None
124             
125             if address is not None:
126                 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
127                 if not res['isvalid'] or not res['ismine']:
128                     print '    Cached address is either invalid or not controlled by local bitcoind!'
129                     address = None
130             
131             if address is None:
132                 print '    Getting payout address from bitcoind...'
133                 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
134             
135             with open(address_path, 'wb') as f:
136                 f.write(address)
137             
138             my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
139         else:
140             my_pubkey_hash = args.pubkey_hash
141         print '    ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
142         print
143         
144         my_share_hashes = set()
145         my_doa_share_hashes = set()
146         
147         tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
148         shared_share_hashes = set()
149         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
150         known_verified = set()
151         print "Loading shares..."
152         for i, (mode, contents) in enumerate(ss.get_shares()):
153             if mode == 'share':
154                 if contents.hash in tracker.items:
155                     continue
156                 shared_share_hashes.add(contents.hash)
157                 contents.time_seen = 0
158                 tracker.add(contents)
159                 if len(tracker.items) % 1000 == 0 and tracker.items:
160                     print "    %i" % (len(tracker.items),)
161             elif mode == 'verified_hash':
162                 known_verified.add(contents)
163             else:
164                 raise AssertionError()
165         print "    ...inserting %i verified shares..." % (len(known_verified),)
166         for h in known_verified:
167             if h not in tracker.items:
168                 ss.forget_verified_share(h)
169                 continue
170             tracker.verified.add(tracker.items[h])
171         print "    ...done loading %i shares!" % (len(tracker.items),)
172         print
173         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
174         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
175         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
176         
177         print 'Initializing work...'
178         
179         
180         # BITCOIND WORK
181         
182         bitcoind_work = variable.Variable((yield getwork(bitcoind)))
183         @defer.inlineCallbacks
184         def work_poller():
185             while True:
186                 flag = factory.new_block.get_deferred()
187                 try:
188                     bitcoind_work.set((yield getwork(bitcoind, bitcoind_work.value['use_getblocktemplate'])))
189                 except:
190                     log.err()
191                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
192         work_poller()
193         
194         # PEER WORK
195         
196         best_block_header = variable.Variable(None)
197         def handle_header(new_header):
198             # check that header matches current target
199             if not (net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(new_header)) <= bitcoind_work.value['bits'].target):
200                 return
201             bitcoind_best_block = bitcoind_work.value['previous_block']
202             if (best_block_header.value is None
203                 or (
204                     new_header['previous_block'] == bitcoind_best_block and
205                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)) == bitcoind_best_block
206                 ) # new is child of current and previous is current
207                 or (
208                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(new_header)) == bitcoind_best_block and
209                     best_block_header.value['previous_block'] != bitcoind_best_block
210                 )): # new is current and previous is not a child of current
211                 best_block_header.set(new_header)
212         @defer.inlineCallbacks
213         def poll_header():
214             handle_header((yield factory.conn.value.get_block_header(bitcoind_work.value['previous_block'])))
215         bitcoind_work.changed.watch(lambda _: poll_header())
216         yield deferral.retry('Error while requesting best block header:')(poll_header)()
217         
218         # BEST SHARE
219         
220         get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: bitcoind_work.value['previous_block'], net)
221         
222         best_share_var = variable.Variable(None)
223         desired_var = variable.Variable(None)
224         def set_best_share():
225             best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'])
226             
227             best_share_var.set(best)
228             desired_var.set(desired)
229         bitcoind_work.changed.watch(lambda _: set_best_share())
230         set_best_share()
231         
232         print '    ...success!'
233         print
234         
235         # setup p2p logic and join p2pool network
236         
237         class Node(p2p.Node):
238             def handle_shares(self, shares, peer):
239                 if len(shares) > 5:
240                     print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
241                 
242                 new_count = 0
243                 for share in shares:
244                     if share.hash in tracker.items:
245                         #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
246                         continue
247                     
248                     new_count += 1
249                     
250                     #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
251                     
252                     tracker.add(share)
253                 
254                 if new_count:
255                     set_best_share()
256                 
257                 if len(shares) > 5:
258                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.items), 2*net.CHAIN_LENGTH)
259             
260             @defer.inlineCallbacks
261             def handle_share_hashes(self, hashes, peer):
262                 new_hashes = [x for x in hashes if x not in tracker.items]
263                 if not new_hashes:
264                     return
265                 try:
266                     shares = yield peer.get_shares(
267                         hashes=new_hashes,
268                         parents=0,
269                         stops=[],
270                     )
271                 except:
272                     log.err(None, 'in handle_share_hashes:')
273                 else:
274                     self.handle_shares(shares, peer)
275             
276             def handle_get_shares(self, hashes, parents, stops, peer):
277                 parents = min(parents, 1000//len(hashes))
278                 stops = set(stops)
279                 shares = []
280                 for share_hash in hashes:
281                     for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
282                         if share.hash in stops:
283                             break
284                         shares.append(share)
285                 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
286                 return shares
287             
288             def handle_bestblock(self, header, peer):
289                 if net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header)) > header['bits'].target:
290                     raise p2p.PeerMisbehavingError('received block header fails PoW test')
291                 handle_header(header)
292         
293         @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
294         def submit_block_p2p(block):
295             if factory.conn.value is None:
296                 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, bitcoin_data.hash256(bitcoin_data.block_header_type.pack(block['header'])))
297                 raise deferral.RetrySilentlyException()
298             factory.conn.value.send_block(block=block)
299         
300         @deferral.retry('Error submitting block: (will retry)', 10, 10)
301         @defer.inlineCallbacks
302         def submit_block_rpc(block, ignore_failure):
303             if bitcoind_work.value['use_getblocktemplate']:
304                 result = yield bitcoind.rpc_submitblock(bitcoin_data.block_type.pack(block).encode('hex'))
305                 success = result is None
306             else:
307                 result = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
308                 success = result
309             success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
310             if (not success and success_expected and not ignore_failure) or (success and not success_expected):
311                 print >>sys.stderr, 'Block submittal result: %s (%r) Expected: %s' % (success, result, success_expected)
312         
313         def submit_block(block, ignore_failure):
314             submit_block_p2p(block)
315             submit_block_rpc(block, ignore_failure)
316         
317         @tracker.verified.added.watch
318         def _(share):
319             if share.pow_hash <= share.header['bits'].target:
320                 submit_block(share.as_block(tracker), ignore_failure=True)
321                 print
322                 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
323                 print
324                 def spread():
325                     if (get_height_rel_highest(share.header['previous_block']) > -5 or
326                         bitcoind_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]):
327                         broadcast_share(share.hash)
328                 spread()
329                 reactor.callLater(5, spread) # so get_height_rel_highest can update
330         
331         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
332         
333         @defer.inlineCallbacks
334         def parse(x):
335             if ':' in x:
336                 ip, port = x.split(':')
337                 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
338             else:
339                 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
340         
341         addrs = {}
342         if os.path.exists(os.path.join(datadir_path, 'addrs')):
343             try:
344                 with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
345                     addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
346             except:
347                 print >>sys.stderr, 'error parsing addrs'
348         for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
349             try:
350                 addr = yield addr_df
351                 if addr not in addrs:
352                     addrs[addr] = (0, time.time(), time.time())
353             except:
354                 log.err()
355         
356         connect_addrs = set()
357         for addr_df in map(parse, args.p2pool_nodes):
358             try:
359                 connect_addrs.add((yield addr_df))
360             except:
361                 log.err()
362         
363         p2p_node = Node(
364             best_share_hash_func=lambda: best_share_var.value,
365             port=args.p2pool_port,
366             net=net,
367             addr_store=addrs,
368             connect_addrs=connect_addrs,
369             max_incoming_conns=args.p2pool_conns,
370             traffic_happened=traffic_happened,
371         )
372         p2p_node.start()
373         
374         def save_addrs():
375             with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
376                 f.write(json.dumps(p2p_node.addr_store.items()))
377         task.LoopingCall(save_addrs).start(60)
378         
379         @best_block_header.changed.watch
380         def _(header):
381             for peer in p2p_node.peers.itervalues():
382                 peer.send_bestblock(header=header)
383         
384         @defer.inlineCallbacks
385         def broadcast_share(share_hash):
386             shares = []
387             for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
388                 if share.hash in shared_share_hashes:
389                     break
390                 shared_share_hashes.add(share.hash)
391                 shares.append(share)
392             
393             for peer in list(p2p_node.peers.itervalues()):
394                 yield peer.sendShares([share for share in shares if share.peer is not peer])
395         
396         # send share when the chain changes to their chain
397         best_share_var.changed.watch(broadcast_share)
398         
399         def save_shares():
400             for share in tracker.get_chain(best_share_var.value, min(tracker.get_height(best_share_var.value), 2*net.CHAIN_LENGTH)):
401                 ss.add_share(share)
402                 if share.hash in tracker.verified.items:
403                     ss.add_verified_hash(share.hash)
404         task.LoopingCall(save_shares).start(60)
405         
406         @apply
407         @defer.inlineCallbacks
408         def download_shares():
409             while True:
410                 desired = yield desired_var.get_when_satisfies(lambda val: len(val) != 0)
411                 peer2, share_hash = random.choice(desired)
412                 
413                 if len(p2p_node.peers) == 0:
414                     yield deferral.sleep(1)
415                     continue
416                 peer = random.choice(p2p_node.peers.values())
417                 
418                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
419                 try:
420                     shares = yield peer.get_shares(
421                         hashes=[share_hash],
422                         parents=500,
423                         stops=[],
424                     )
425                 except:
426                     log.err(None, 'in download_shares:')
427                     continue
428                 
429                 if not shares:
430                     yield deferral.sleep(1) # sleep so we don't keep rerequesting the same share nobody has
431                     continue
432                 p2p_node.handle_shares(shares, peer)
433         
434         print '    ...success!'
435         print
436         
437         if args.upnp:
438             @defer.inlineCallbacks
439             def upnp_thread():
440                 while True:
441                     try:
442                         is_lan, lan_ip = yield ipdiscover.get_local_ip()
443                         if is_lan:
444                             pm = yield portmapper.get_port_mapper()
445                             yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
446                     except defer.TimeoutError:
447                         pass
448                     except:
449                         if p2pool.DEBUG:
450                             log.err(None, 'UPnP error:')
451                     yield deferral.sleep(random.expovariate(1/120))
452             upnp_thread()
453         
454         # start listening for workers with a JSON-RPC server
455         
456         print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
457         
458         get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, best_share_var.value, bitcoind_work.value['bits'].target, bitcoind_work.value['subsidy'], net)
459         
460         wb = work.WorkerBridge(my_pubkey_hash, net, args.donation_percentage, bitcoind_work, best_block_header, merged_urls, best_share_var, tracker, my_share_hashes, my_doa_share_hashes, args.worker_fee, p2p_node, submit_block, set_best_share, broadcast_share, block_height_var)
461         web_root = web.get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net, wb.get_stale_counts, my_pubkey_hash, wb.local_rate_monitor, args.worker_fee, p2p_node, wb.my_share_hashes, wb.pseudoshare_received, wb.share_received, best_share_var, bitcoind_warning_var, traffic_happened)
462         worker_interface.WorkerInterface(wb).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
463         
464         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
465         
466         with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
467             pass
468         
469         print '    ...success!'
470         print
471         
472         
473         # done!
474         print 'Started successfully!'
475         print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
476         if args.donation_percentage > 0.51:
477             print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
478         elif args.donation_percentage < 0.49:
479             print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
480         else:
481             print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
482             print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
483         print
484         
485         
486         if hasattr(signal, 'SIGALRM'):
487             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
488                 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
489             ))
490             signal.siginterrupt(signal.SIGALRM, False)
491             task.LoopingCall(signal.alarm, 30).start(1)
492         
493         if args.irc_announce:
494             from twisted.words.protocols import irc
495             class IRCClient(irc.IRCClient):
496                 nickname = 'p2pool%02i' % (random.randrange(100),)
497                 channel = net.ANNOUNCE_CHANNEL
498                 def lineReceived(self, line):
499                     if p2pool.DEBUG:
500                         print repr(line)
501                     irc.IRCClient.lineReceived(self, line)
502                 def signedOn(self):
503                     irc.IRCClient.signedOn(self)
504                     self.factory.resetDelay()
505                     self.join(self.channel)
506                     @defer.inlineCallbacks
507                     def new_share(share):
508                         if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
509                             yield deferral.sleep(random.expovariate(1/60))
510                             message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
511                             if all('%x' % (share.header_hash,) not in old_message for old_message in self.recent_messages):
512                                 self.say(self.channel, message)
513                                 self._remember_message(message)
514                     self.watch_id = tracker.verified.added.watch(new_share)
515                     self.recent_messages = []
516                 def _remember_message(self, message):
517                     self.recent_messages.append(message)
518                     while len(self.recent_messages) > 100:
519                         self.recent_messages.pop(0)
520                 def privmsg(self, user, channel, message):
521                     if channel == self.channel:
522                         self._remember_message(message)
523                 def connectionLost(self, reason):
524                     tracker.verified.added.unwatch(self.watch_id)
525                     print 'IRC connection lost:', reason.getErrorMessage()
526             class IRCClientFactory(protocol.ReconnectingClientFactory):
527                 protocol = IRCClient
528             reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
529         
530         @defer.inlineCallbacks
531         def status_thread():
532             last_str = None
533             last_time = 0
534             while True:
535                 yield deferral.sleep(3)
536                 try:
537                     height = tracker.get_height(best_share_var.value)
538                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
539                         height,
540                         len(tracker.verified.items),
541                         len(tracker.items),
542                         len(p2p_node.peers),
543                         sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
544                     ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
545                     
546                     datums, dt = wb.local_rate_monitor.get_datums_in_last()
547                     my_att_s = sum(datum['work']/dt for datum in datums)
548                     this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
549                         math.format(int(my_att_s)),
550                         math.format_dt(dt),
551                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
552                         math.format_dt(2**256 / tracker.items[best_share_var.value].max_target / my_att_s) if my_att_s and best_share_var.value else '???',
553                     )
554                     
555                     if height > 2:
556                         (stale_orphan_shares, stale_doa_shares), shares, _ = wb.get_stale_counts()
557                         stale_prop = p2pool_data.get_average_stale_prop(tracker, best_share_var.value, min(60*60//net.SHARE_PERIOD, height))
558                         real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, best_share_var.value, min(height - 1, 60*60//net.SHARE_PERIOD)) / (1 - stale_prop)
559                         
560                         this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
561                             shares, stale_orphan_shares, stale_doa_shares,
562                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
563                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
564                             get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
565                         )
566                         this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
567                             math.format(int(real_att_s)),
568                             100*stale_prop,
569                             math.format_dt(2**256 / bitcoind_work.value['bits'].target / real_att_s),
570                         )
571                         
572                         for warning in p2pool_data.get_warnings(tracker, best_share_var.value, net, bitcoind_warning_var.value, bitcoind_work.value):
573                             print >>sys.stderr, '#'*40
574                             print >>sys.stderr, '>>> Warning: ' + warning
575                             print >>sys.stderr, '#'*40
576                     
577                     if this_str != last_str or time.time() > last_time + 15:
578                         print this_str
579                         last_str = this_str
580                         last_time = time.time()
581                 except:
582                     log.err()
583         status_thread()
584     except:
585         reactor.stop()
586         log.err(None, 'Fatal error:')
587
588 def run():
589     realnets = dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
590     
591     parser = fixargparse.FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
592     parser.add_argument('--version', action='version', version=p2pool.__version__)
593     parser.add_argument('--net',
594         help='use specified network (default: bitcoin)',
595         action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
596     parser.add_argument('--testnet',
597         help='''use the network's testnet''',
598         action='store_const', const=True, default=False, dest='testnet')
599     parser.add_argument('--debug',
600         help='enable debugging mode',
601         action='store_const', const=True, default=False, dest='debug')
602     parser.add_argument('-a', '--address',
603         help='generate payouts to this address (default: <address requested from bitcoind>)',
604         type=str, action='store', default=None, dest='address')
605     parser.add_argument('--datadir',
606         help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
607         type=str, action='store', default=None, dest='datadir')
608     parser.add_argument('--logfile',
609         help='''log to this file (default: data/<NET>/log)''',
610         type=str, action='store', default=None, dest='logfile')
611     parser.add_argument('--merged',
612         help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
613         type=str, action='append', default=[], dest='merged_urls')
614     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
615         help='donate this percentage of work towards the development of p2pool (default: 0.5)',
616         type=float, action='store', default=0.5, dest='donation_percentage')
617     parser.add_argument('--iocp',
618         help='use Windows IOCP API in order to avoid errors due to large number of sockets being open',
619         action='store_true', default=False, dest='iocp')
620     parser.add_argument('--irc-announce',
621         help='announce any blocks found on irc://irc.freenode.net/#p2pool',
622         action='store_true', default=False, dest='irc_announce')
623     parser.add_argument('--no-bugreport',
624         help='disable submitting caught exceptions to the author',
625         action='store_true', default=False, dest='no_bugreport')
626     
627     p2pool_group = parser.add_argument_group('p2pool interface')
628     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
629         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
630         type=int, action='store', default=None, dest='p2pool_port')
631     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
632         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
633         type=str, action='append', default=[], dest='p2pool_nodes')
634     parser.add_argument('--disable-upnp',
635         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
636         action='store_false', default=True, dest='upnp')
637     p2pool_group.add_argument('--max-conns', metavar='CONNS',
638         help='maximum incoming connections (default: 40)',
639         type=int, action='store', default=40, dest='p2pool_conns')
640     
641     worker_group = parser.add_argument_group('worker interface')
642     worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
643         help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
644         type=str, action='store', default=None, dest='worker_endpoint')
645     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
646         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
647         type=float, action='store', default=0, dest='worker_fee')
648     
649     bitcoind_group = parser.add_argument_group('bitcoind interface')
650     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
651         help='connect to this address (default: 127.0.0.1)',
652         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
653     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
654         help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
655         type=int, action='store', default=None, dest='bitcoind_rpc_port')
656     bitcoind_group.add_argument('--bitcoind-rpc-ssl',
657         help='connect to JSON-RPC interface using SSL',
658         action='store_true', default=False, dest='bitcoind_rpc_ssl')
659     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
660         help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
661         type=int, action='store', default=None, dest='bitcoind_p2p_port')
662     
663     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
664         help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
665         type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
666     
667     args = parser.parse_args()
668     
669     if args.debug:
670         p2pool.DEBUG = True
671         defer.setDebugging(True)
672     
673     net_name = args.net_name + ('_testnet' if args.testnet else '')
674     net = networks.nets[net_name]
675     
676     datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
677     if not os.path.exists(datadir_path):
678         os.makedirs(datadir_path)
679     
680     if len(args.bitcoind_rpc_userpass) > 2:
681         parser.error('a maximum of two arguments are allowed')
682     args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
683     
684     if args.bitcoind_rpc_password is None:
685         conf_path = net.PARENT.CONF_FILE_FUNC()
686         if not os.path.exists(conf_path):
687             parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
688                 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
689                 '''\r\n'''
690                 '''server=1\r\n'''
691                 '''rpcpassword=%x\r\n'''
692                 '''\r\n'''
693                 '''Keep that password secret! After creating the file, restart Bitcoin.''' % (conf_path, random.randrange(2**128)))
694         conf = open(conf_path, 'rb').read()
695         contents = {}
696         for line in conf.splitlines(True):
697             if '#' in line:
698                 line = line[:line.index('#')]
699             if '=' not in line:
700                 continue
701             k, v = line.split('=', 1)
702             contents[k.strip()] = v.strip()
703         for conf_name, var_name, var_type in [
704             ('rpcuser', 'bitcoind_rpc_username', str),
705             ('rpcpassword', 'bitcoind_rpc_password', str),
706             ('rpcport', 'bitcoind_rpc_port', int),
707             ('port', 'bitcoind_p2p_port', int),
708         ]:
709             if getattr(args, var_name) is None and conf_name in contents:
710                 setattr(args, var_name, var_type(contents[conf_name]))
711         if args.bitcoind_rpc_password is None:
712             parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
713     
714     if args.bitcoind_rpc_username is None:
715         args.bitcoind_rpc_username = ''
716     
717     if args.bitcoind_rpc_port is None:
718         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
719     
720     if args.bitcoind_p2p_port is None:
721         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
722     
723     if args.p2pool_port is None:
724         args.p2pool_port = net.P2P_PORT
725     
726     if args.worker_endpoint is None:
727         worker_endpoint = '', net.WORKER_PORT
728     elif ':' not in args.worker_endpoint:
729         worker_endpoint = '', int(args.worker_endpoint)
730     else:
731         addr, port = args.worker_endpoint.rsplit(':', 1)
732         worker_endpoint = addr, int(port)
733     
734     if args.address is not None:
735         try:
736             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
737         except Exception, e:
738             parser.error('error parsing address: ' + repr(e))
739     else:
740         args.pubkey_hash = None
741     
742     def separate_url(url):
743         s = urlparse.urlsplit(url)
744         if '@' not in s.netloc:
745             parser.error('merged url netloc must contain an "@"')
746         userpass, new_netloc = s.netloc.rsplit('@', 1)
747         return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
748     merged_urls = map(separate_url, args.merged_urls)
749     
750     if args.logfile is None:
751         args.logfile = os.path.join(datadir_path, 'log')
752     
753     logfile = logging.LogFile(args.logfile)
754     pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
755     sys.stdout = logging.AbortPipe(pipe)
756     sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
757     if hasattr(signal, "SIGUSR1"):
758         def sigusr1(signum, frame):
759             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
760             logfile.reopen()
761             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
762         signal.signal(signal.SIGUSR1, sigusr1)
763     task.LoopingCall(logfile.reopen).start(5)
764     
765     class ErrorReporter(object):
766         def __init__(self):
767             self.last_sent = None
768         
769         def emit(self, eventDict):
770             if not eventDict["isError"]:
771                 return
772             
773             if self.last_sent is not None and time.time() < self.last_sent + 5:
774                 return
775             self.last_sent = time.time()
776             
777             if 'failure' in eventDict:
778                 text = ((eventDict.get('why') or 'Unhandled Error')
779                     + '\n' + eventDict['failure'].getTraceback())
780             else:
781                 text = " ".join([str(m) for m in eventDict["message"]]) + "\n"
782             
783             from twisted.web import client
784             client.getPage(
785                 url='http://u.forre.st/p2pool_error.cgi',
786                 method='POST',
787                 postdata=p2pool.__version__ + ' ' + net.NAME + '\n' + text,
788                 timeout=15,
789             ).addBoth(lambda x: None)
790     if not args.no_bugreport:
791         log.addObserver(ErrorReporter().emit)
792     
793     reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)
794     reactor.run()