penalization and sending txs ahead of shares
[p2pool.git] / p2pool / main.py
1 from __future__ import division
2
3 import base64
4 import json
5 import os
6 import random
7 import sys
8 import time
9 import signal
10 import traceback
11 import urlparse
12
13 if '--iocp' in sys.argv:
14     from twisted.internet import iocpreactor
15     iocpreactor.install()
16 from twisted.internet import defer, reactor, protocol, task
17 from twisted.web import server
18 from twisted.python import log
19 from nattraverso import portmapper, ipdiscover
20
21 import bitcoin.p2p as bitcoin_p2p, bitcoin.data as bitcoin_data
22 from bitcoin import worker_interface, height_tracker
23 from util import fixargparse, jsonrpc, variable, deferral, math, logging
24 from . import p2p, networks, web, work
25 import p2pool, p2pool.data as p2pool_data
26
27 @deferral.retry('Error getting work from bitcoind:', 3)
28 @defer.inlineCallbacks
29 def getwork(bitcoind, use_getblocktemplate=False):
30     def go():
31         if use_getblocktemplate:
32             return bitcoind.rpc_getblocktemplate(dict(mode='template'))
33         else:
34             return bitcoind.rpc_getmemorypool()
35     try:
36         work = yield go()
37     except jsonrpc.Error_for_code(-32601): # Method not found
38         use_getblocktemplate = not use_getblocktemplate
39         try:
40             work = yield go()
41         except jsonrpc.Error_for_code(-32601): # Method not found
42             print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
43             raise deferral.RetrySilentlyException()
44     packed_transactions = [(x['data'] if isinstance(x, dict) else x).decode('hex') for x in work['transactions']]
45     if 'height' not in work:
46         work['height'] = (yield bitcoind.rpc_getblock(work['previousblockhash']))['height'] + 1
47     elif p2pool.DEBUG:
48         assert work['height'] == (yield bitcoind.rpc_getblock(work['previousblockhash']))['height'] + 1
49     defer.returnValue(dict(
50         version=work['version'],
51         previous_block=int(work['previousblockhash'], 16),
52         transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
53         subsidy=work['coinbasevalue'],
54         time=work['time'] if 'time' in work else work['curtime'],
55         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
56         coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
57         height=work['height'],
58         last_update=time.time(),
59         use_getblocktemplate=use_getblocktemplate,
60     ))
61
62 @defer.inlineCallbacks
63 def main(args, net, datadir_path, merged_urls, worker_endpoint):
64     try:
65         print 'p2pool (version %s)' % (p2pool.__version__,)
66         print
67         
68         traffic_happened = variable.Event()
69         
70         @defer.inlineCallbacks
71         def connect_p2p():
72             # connect to bitcoind over bitcoin-p2p
73             print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
74             factory = bitcoin_p2p.ClientFactory(net.PARENT)
75             reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
76             yield factory.getProtocol() # waits until handshake is successful
77             print '    ...success!'
78             print
79             defer.returnValue(factory)
80         
81         if args.testnet: # establish p2p connection first if testnet so bitcoind can work without connections
82             factory = yield connect_p2p()
83         
84         # connect to bitcoind over JSON-RPC and do initial getmemorypool
85         url = '%s://%s:%i/' % ('https' if args.bitcoind_rpc_ssl else 'http', args.bitcoind_address, args.bitcoind_rpc_port)
86         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
87         bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
88         @deferral.retry('Error while checking Bitcoin connection:', 1)
89         @defer.inlineCallbacks
90         def check():
91             if not (yield net.PARENT.RPC_CHECK(bitcoind)):
92                 print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
93                 raise deferral.RetrySilentlyException()
94             if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version']):
95                 print >>sys.stderr, '    Bitcoin version too old! Upgrade to 0.6.4 or newer!'
96                 raise deferral.RetrySilentlyException()
97         yield check()
98         temp_work = yield getwork(bitcoind)
99         
100         if not args.testnet:
101             factory = yield connect_p2p()
102         
103         block_height_var = variable.Variable(None)
104         @defer.inlineCallbacks
105         def poll_height():
106             block_height_var.set((yield deferral.retry('Error while calling getblockcount:')(bitcoind.rpc_getblockcount)()))
107         yield poll_height()
108         task.LoopingCall(poll_height).start(60*60)
109         
110         bitcoind_warning_var = variable.Variable(None)
111         @defer.inlineCallbacks
112         def poll_warnings():
113             errors = (yield deferral.retry('Error while calling getmininginfo:')(bitcoind.rpc_getmininginfo)())['errors']
114             bitcoind_warning_var.set(errors if errors != '' else None)
115         yield poll_warnings()
116         task.LoopingCall(poll_warnings).start(20*60)
117         
118         print '    ...success!'
119         print '    Current block hash: %x' % (temp_work['previous_block'],)
120         print '    Current block height: %i' % (block_height_var.value,)
121         print
122         
123         print 'Determining payout address...'
124         if args.pubkey_hash is None:
125             address_path = os.path.join(datadir_path, 'cached_payout_address')
126             
127             if os.path.exists(address_path):
128                 with open(address_path, 'rb') as f:
129                     address = f.read().strip('\r\n')
130                 print '    Loaded cached address: %s...' % (address,)
131             else:
132                 address = None
133             
134             if address is not None:
135                 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
136                 if not res['isvalid'] or not res['ismine']:
137                     print '    Cached address is either invalid or not controlled by local bitcoind!'
138                     address = None
139             
140             if address is None:
141                 print '    Getting payout address from bitcoind...'
142                 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
143             
144             with open(address_path, 'wb') as f:
145                 f.write(address)
146             
147             my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
148         else:
149             my_pubkey_hash = args.pubkey_hash
150         print '    ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
151         print
152         
153         my_share_hashes = set()
154         my_doa_share_hashes = set()
155         
156         tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
157         shared_share_hashes = set()
158         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
159         known_verified = set()
160         print "Loading shares..."
161         for i, (mode, contents) in enumerate(ss.get_shares()):
162             if mode == 'share':
163                 if contents.hash in tracker.items:
164                     continue
165                 shared_share_hashes.add(contents.hash)
166                 contents.time_seen = 0
167                 tracker.add(contents)
168                 if len(tracker.items) % 1000 == 0 and tracker.items:
169                     print "    %i" % (len(tracker.items),)
170             elif mode == 'verified_hash':
171                 known_verified.add(contents)
172             else:
173                 raise AssertionError()
174         print "    ...inserting %i verified shares..." % (len(known_verified),)
175         for h in known_verified:
176             if h not in tracker.items:
177                 ss.forget_verified_share(h)
178                 continue
179             tracker.verified.add(tracker.items[h])
180         print "    ...done loading %i shares!" % (len(tracker.items),)
181         print
182         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
183         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
184         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
185         
186         print 'Initializing work...'
187         
188         
189         # BITCOIND WORK
190         
191         bitcoind_work = variable.Variable((yield getwork(bitcoind)))
192         @defer.inlineCallbacks
193         def work_poller():
194             while True:
195                 flag = factory.new_block.get_deferred()
196                 try:
197                     bitcoind_work.set((yield getwork(bitcoind, bitcoind_work.value['use_getblocktemplate'])))
198                 except:
199                     log.err()
200                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
201         work_poller()
202         
203         # PEER WORK
204         
205         best_block_header = variable.Variable(None)
206         def handle_header(new_header):
207             # check that header matches current target
208             if not (net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(new_header)) <= bitcoind_work.value['bits'].target):
209                 return
210             bitcoind_best_block = bitcoind_work.value['previous_block']
211             if (best_block_header.value is None
212                 or (
213                     new_header['previous_block'] == bitcoind_best_block and
214                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)) == bitcoind_best_block
215                 ) # new is child of current and previous is current
216                 or (
217                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(new_header)) == bitcoind_best_block and
218                     best_block_header.value['previous_block'] != bitcoind_best_block
219                 )): # new is current and previous is not a child of current
220                 best_block_header.set(new_header)
221         @defer.inlineCallbacks
222         def poll_header():
223             handle_header((yield factory.conn.value.get_block_header(bitcoind_work.value['previous_block'])))
224         bitcoind_work.changed.watch(lambda _: poll_header())
225         yield deferral.retry('Error while requesting best block header:')(poll_header)()
226         
227         # BEST SHARE
228         
229         known_txs_var = variable.Variable({}) # hash -> tx
230         mining_txs_var = variable.Variable({}) # hash -> tx
231         get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: bitcoind_work.value['previous_block'], net)
232         
233         best_share_var = variable.Variable(None)
234         desired_var = variable.Variable(None)
235         def set_best_share():
236             best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'], known_txs_var.value)
237             
238             best_share_var.set(best)
239             desired_var.set(desired)
240         bitcoind_work.changed.watch(lambda _: set_best_share())
241         set_best_share()
242         
243         print '    ...success!'
244         print
245         
246         # setup p2p logic and join p2pool network
247         
248         # update mining_txs according to getwork results
249         @bitcoind_work.changed.run_and_watch
250         def _(_=None):
251             new_mining_txs = {}
252             new_known_txs = dict(known_txs_var.value)
253             for tx in bitcoind_work.value['transactions']:
254                 tx_hash = bitcoin_data.hash256(bitcoin_data.tx_type.pack(tx))
255                 new_mining_txs[tx_hash] = tx
256                 new_known_txs[tx_hash] = tx
257             mining_txs_var.set(new_mining_txs)
258             known_txs_var.set(new_known_txs)
259         # forward transactions seen to bitcoind
260         @known_txs_var.transitioned.watch
261         def _(before, after):
262             for tx_hash in set(after) - set(before):
263                 factory.conn.value.send_tx(tx=after[tx_hash])
264         
265         class Node(p2p.Node):
266             def handle_shares(self, shares, peer):
267                 if len(shares) > 5:
268                     print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
269                 
270                 new_count = 0
271                 for share in shares:
272                     if share.hash in tracker.items:
273                         #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
274                         continue
275                     
276                     new_count += 1
277                     
278                     #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
279                     
280                     tracker.add(share)
281                 
282                 if new_count:
283                     set_best_share()
284                 
285                 if len(shares) > 5:
286                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.items), 2*net.CHAIN_LENGTH)
287             
288             @defer.inlineCallbacks
289             def handle_share_hashes(self, hashes, peer):
290                 new_hashes = [x for x in hashes if x not in tracker.items]
291                 if not new_hashes:
292                     return
293                 try:
294                     shares = yield peer.get_shares(
295                         hashes=new_hashes,
296                         parents=0,
297                         stops=[],
298                     )
299                 except:
300                     log.err(None, 'in handle_share_hashes:')
301                 else:
302                     self.handle_shares(shares, peer)
303             
304             def handle_get_shares(self, hashes, parents, stops, peer):
305                 parents = min(parents, 1000//len(hashes))
306                 stops = set(stops)
307                 shares = []
308                 for share_hash in hashes:
309                     for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
310                         if share.hash in stops:
311                             break
312                         shares.append(share)
313                 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
314                 return shares
315             
316             def handle_bestblock(self, header, peer):
317                 if net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header)) > header['bits'].target:
318                     raise p2p.PeerMisbehavingError('received block header fails PoW test')
319                 handle_header(header)
320         
321         @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
322         def submit_block_p2p(block):
323             if factory.conn.value is None:
324                 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, bitcoin_data.hash256(bitcoin_data.block_header_type.pack(block['header'])))
325                 raise deferral.RetrySilentlyException()
326             factory.conn.value.send_block(block=block)
327         
328         @deferral.retry('Error submitting block: (will retry)', 10, 10)
329         @defer.inlineCallbacks
330         def submit_block_rpc(block, ignore_failure):
331             if bitcoind_work.value['use_getblocktemplate']:
332                 result = yield bitcoind.rpc_submitblock(bitcoin_data.block_type.pack(block).encode('hex'))
333                 success = result is None
334             else:
335                 result = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
336                 success = result
337             success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
338             if (not success and success_expected and not ignore_failure) or (success and not success_expected):
339                 print >>sys.stderr, 'Block submittal result: %s (%r) Expected: %s' % (success, result, success_expected)
340         
341         def submit_block(block, ignore_failure):
342             submit_block_p2p(block)
343             submit_block_rpc(block, ignore_failure)
344         
345         @tracker.verified.added.watch
346         def _(share):
347             if share.pow_hash <= share.header['bits'].target:
348                 block = share.as_block(tracker, known_txs_var.value)
349                 if block is None:
350                     print >>sys.stderr, 'GOT INCOMPLETE BLOCK FROM PEER! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
351                     return
352                 submit_block(block, ignore_failure=True)
353                 print
354                 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
355                 print
356                 def spread():
357                     if (get_height_rel_highest(share.header['previous_block']) > -5 or
358                         bitcoind_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]):
359                         broadcast_share(share.hash)
360                 spread()
361                 reactor.callLater(5, spread) # so get_height_rel_highest can update
362         
363         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
364         
365         @defer.inlineCallbacks
366         def parse(x):
367             if ':' in x:
368                 ip, port = x.split(':')
369                 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
370             else:
371                 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
372         
373         addrs = {}
374         if os.path.exists(os.path.join(datadir_path, 'addrs')):
375             try:
376                 with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
377                     addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
378             except:
379                 print >>sys.stderr, 'error parsing addrs'
380         for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
381             try:
382                 addr = yield addr_df
383                 if addr not in addrs:
384                     addrs[addr] = (0, time.time(), time.time())
385             except:
386                 log.err()
387         
388         connect_addrs = set()
389         for addr_df in map(parse, args.p2pool_nodes):
390             try:
391                 connect_addrs.add((yield addr_df))
392             except:
393                 log.err()
394         
395         p2p_node = Node(
396             best_share_hash_func=lambda: best_share_var.value,
397             port=args.p2pool_port,
398             net=net,
399             addr_store=addrs,
400             connect_addrs=connect_addrs,
401             max_incoming_conns=args.p2pool_conns,
402             traffic_happened=traffic_happened,
403             known_txs_var=known_txs_var,
404             mining_txs_var=mining_txs_var,
405         )
406         p2p_node.start()
407         
408         def save_addrs():
409             with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
410                 f.write(json.dumps(p2p_node.addr_store.items()))
411         task.LoopingCall(save_addrs).start(60)
412         
413         @best_block_header.changed.watch
414         def _(header):
415             for peer in p2p_node.peers.itervalues():
416                 peer.send_bestblock(header=header)
417         
418         @defer.inlineCallbacks
419         def broadcast_share(share_hash):
420             shares = []
421             for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
422                 if share.hash in shared_share_hashes:
423                     break
424                 shared_share_hashes.add(share.hash)
425                 shares.append(share)
426             
427             for peer in list(p2p_node.peers.itervalues()):
428                 yield peer.sendShares([share for share in shares if share.peer is not peer], tracker, known_txs_var.value, include_txs_with=[share_hash])
429         
430         # send share when the chain changes to their chain
431         best_share_var.changed.watch(broadcast_share)
432         
433         def save_shares():
434             for share in tracker.get_chain(best_share_var.value, min(tracker.get_height(best_share_var.value), 2*net.CHAIN_LENGTH)):
435                 ss.add_share(share)
436                 if share.hash in tracker.verified.items:
437                     ss.add_verified_hash(share.hash)
438         task.LoopingCall(save_shares).start(60)
439         
440         @apply
441         @defer.inlineCallbacks
442         def download_shares():
443             while True:
444                 desired = yield desired_var.get_when_satisfies(lambda val: len(val) != 0)
445                 peer2, share_hash = random.choice(desired)
446                 
447                 if len(p2p_node.peers) == 0:
448                     yield deferral.sleep(1)
449                     continue
450                 peer = random.choice(p2p_node.peers.values())
451                 
452                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
453                 try:
454                     shares = yield peer.get_shares(
455                         hashes=[share_hash],
456                         parents=500,
457                         stops=[],
458                     )
459                 except:
460                     log.err(None, 'in download_shares:')
461                     continue
462                 
463                 if not shares:
464                     yield deferral.sleep(1) # sleep so we don't keep rerequesting the same share nobody has
465                     continue
466                 p2p_node.handle_shares(shares, peer)
467         
468         print '    ...success!'
469         print
470         
471         if args.upnp:
472             @defer.inlineCallbacks
473             def upnp_thread():
474                 while True:
475                     try:
476                         is_lan, lan_ip = yield ipdiscover.get_local_ip()
477                         if is_lan:
478                             pm = yield portmapper.get_port_mapper()
479                             yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
480                     except defer.TimeoutError:
481                         pass
482                     except:
483                         if p2pool.DEBUG:
484                             log.err(None, 'UPnP error:')
485                     yield deferral.sleep(random.expovariate(1/120))
486             upnp_thread()
487         
488         # start listening for workers with a JSON-RPC server
489         
490         print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
491         
492         get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, best_share_var.value, bitcoind_work.value['bits'].target, bitcoind_work.value['subsidy'], net)
493         
494         wb = work.WorkerBridge(my_pubkey_hash, net, args.donation_percentage, bitcoind_work, best_block_header, merged_urls, best_share_var, tracker, my_share_hashes, my_doa_share_hashes, args.worker_fee, p2p_node, submit_block, set_best_share, broadcast_share, block_height_var)
495         web_root = web.get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net, wb.get_stale_counts, my_pubkey_hash, wb.local_rate_monitor, args.worker_fee, p2p_node, wb.my_share_hashes, wb.pseudoshare_received, wb.share_received, best_share_var, bitcoind_warning_var, traffic_happened)
496         worker_interface.WorkerInterface(wb).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
497         
498         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
499         
500         with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
501             pass
502         
503         print '    ...success!'
504         print
505         
506         
507         # done!
508         print 'Started successfully!'
509         print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
510         if args.donation_percentage > 0.51:
511             print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
512         elif args.donation_percentage < 0.49:
513             print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
514         else:
515             print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
516             print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
517         print
518         
519         
520         if hasattr(signal, 'SIGALRM'):
521             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
522                 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
523             ))
524             signal.siginterrupt(signal.SIGALRM, False)
525             task.LoopingCall(signal.alarm, 30).start(1)
526         
527         if args.irc_announce:
528             from twisted.words.protocols import irc
529             class IRCClient(irc.IRCClient):
530                 nickname = 'p2pool%02i' % (random.randrange(100),)
531                 channel = net.ANNOUNCE_CHANNEL
532                 def lineReceived(self, line):
533                     if p2pool.DEBUG:
534                         print repr(line)
535                     irc.IRCClient.lineReceived(self, line)
536                 def signedOn(self):
537                     irc.IRCClient.signedOn(self)
538                     self.factory.resetDelay()
539                     self.join(self.channel)
540                     @defer.inlineCallbacks
541                     def new_share(share):
542                         if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
543                             yield deferral.sleep(random.expovariate(1/60))
544                             message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
545                             if all('%x' % (share.header_hash,) not in old_message for old_message in self.recent_messages):
546                                 self.say(self.channel, message)
547                                 self._remember_message(message)
548                     self.watch_id = tracker.verified.added.watch(new_share)
549                     self.recent_messages = []
550                 def _remember_message(self, message):
551                     self.recent_messages.append(message)
552                     while len(self.recent_messages) > 100:
553                         self.recent_messages.pop(0)
554                 def privmsg(self, user, channel, message):
555                     if channel == self.channel:
556                         self._remember_message(message)
557                 def connectionLost(self, reason):
558                     tracker.verified.added.unwatch(self.watch_id)
559                     print 'IRC connection lost:', reason.getErrorMessage()
560             class IRCClientFactory(protocol.ReconnectingClientFactory):
561                 protocol = IRCClient
562             reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
563         
564         @defer.inlineCallbacks
565         def status_thread():
566             last_str = None
567             last_time = 0
568             while True:
569                 yield deferral.sleep(3)
570                 try:
571                     height = tracker.get_height(best_share_var.value)
572                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
573                         height,
574                         len(tracker.verified.items),
575                         len(tracker.items),
576                         len(p2p_node.peers),
577                         sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
578                     ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
579                     
580                     datums, dt = wb.local_rate_monitor.get_datums_in_last()
581                     my_att_s = sum(datum['work']/dt for datum in datums)
582                     this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
583                         math.format(int(my_att_s)),
584                         math.format_dt(dt),
585                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
586                         math.format_dt(2**256 / tracker.items[best_share_var.value].max_target / my_att_s) if my_att_s and best_share_var.value else '???',
587                     )
588                     
589                     if height > 2:
590                         (stale_orphan_shares, stale_doa_shares), shares, _ = wb.get_stale_counts()
591                         stale_prop = p2pool_data.get_average_stale_prop(tracker, best_share_var.value, min(60*60//net.SHARE_PERIOD, height))
592                         real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, best_share_var.value, min(height - 1, 60*60//net.SHARE_PERIOD)) / (1 - stale_prop)
593                         
594                         this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
595                             shares, stale_orphan_shares, stale_doa_shares,
596                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
597                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
598                             get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
599                         )
600                         this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
601                             math.format(int(real_att_s)),
602                             100*stale_prop,
603                             math.format_dt(2**256 / bitcoind_work.value['bits'].target / real_att_s),
604                         )
605                         
606                         for warning in p2pool_data.get_warnings(tracker, best_share_var.value, net, bitcoind_warning_var.value, bitcoind_work.value):
607                             print >>sys.stderr, '#'*40
608                             print >>sys.stderr, '>>> Warning: ' + warning
609                             print >>sys.stderr, '#'*40
610                     
611                     if this_str != last_str or time.time() > last_time + 15:
612                         print this_str
613                         last_str = this_str
614                         last_time = time.time()
615                 except:
616                     log.err()
617         status_thread()
618     except:
619         reactor.stop()
620         log.err(None, 'Fatal error:')
621
622 def run():
623     realnets = dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
624     
625     parser = fixargparse.FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
626     parser.add_argument('--version', action='version', version=p2pool.__version__)
627     parser.add_argument('--net',
628         help='use specified network (default: bitcoin)',
629         action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
630     parser.add_argument('--testnet',
631         help='''use the network's testnet''',
632         action='store_const', const=True, default=False, dest='testnet')
633     parser.add_argument('--debug',
634         help='enable debugging mode',
635         action='store_const', const=True, default=False, dest='debug')
636     parser.add_argument('-a', '--address',
637         help='generate payouts to this address (default: <address requested from bitcoind>)',
638         type=str, action='store', default=None, dest='address')
639     parser.add_argument('--datadir',
640         help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
641         type=str, action='store', default=None, dest='datadir')
642     parser.add_argument('--logfile',
643         help='''log to this file (default: data/<NET>/log)''',
644         type=str, action='store', default=None, dest='logfile')
645     parser.add_argument('--merged',
646         help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
647         type=str, action='append', default=[], dest='merged_urls')
648     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
649         help='donate this percentage of work towards the development of p2pool (default: 0.5)',
650         type=float, action='store', default=0.5, dest='donation_percentage')
651     parser.add_argument('--iocp',
652         help='use Windows IOCP API in order to avoid errors due to large number of sockets being open',
653         action='store_true', default=False, dest='iocp')
654     parser.add_argument('--irc-announce',
655         help='announce any blocks found on irc://irc.freenode.net/#p2pool',
656         action='store_true', default=False, dest='irc_announce')
657     parser.add_argument('--no-bugreport',
658         help='disable submitting caught exceptions to the author',
659         action='store_true', default=False, dest='no_bugreport')
660     
661     p2pool_group = parser.add_argument_group('p2pool interface')
662     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
663         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
664         type=int, action='store', default=None, dest='p2pool_port')
665     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
666         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
667         type=str, action='append', default=[], dest='p2pool_nodes')
668     parser.add_argument('--disable-upnp',
669         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
670         action='store_false', default=True, dest='upnp')
671     p2pool_group.add_argument('--max-conns', metavar='CONNS',
672         help='maximum incoming connections (default: 40)',
673         type=int, action='store', default=40, dest='p2pool_conns')
674     
675     worker_group = parser.add_argument_group('worker interface')
676     worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
677         help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
678         type=str, action='store', default=None, dest='worker_endpoint')
679     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
680         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
681         type=float, action='store', default=0, dest='worker_fee')
682     
683     bitcoind_group = parser.add_argument_group('bitcoind interface')
684     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
685         help='connect to this address (default: 127.0.0.1)',
686         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
687     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
688         help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
689         type=int, action='store', default=None, dest='bitcoind_rpc_port')
690     bitcoind_group.add_argument('--bitcoind-rpc-ssl',
691         help='connect to JSON-RPC interface using SSL',
692         action='store_true', default=False, dest='bitcoind_rpc_ssl')
693     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
694         help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
695         type=int, action='store', default=None, dest='bitcoind_p2p_port')
696     
697     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
698         help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
699         type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
700     
701     args = parser.parse_args()
702     
703     if args.debug:
704         p2pool.DEBUG = True
705         defer.setDebugging(True)
706     
707     net_name = args.net_name + ('_testnet' if args.testnet else '')
708     net = networks.nets[net_name]
709     
710     datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
711     if not os.path.exists(datadir_path):
712         os.makedirs(datadir_path)
713     
714     if len(args.bitcoind_rpc_userpass) > 2:
715         parser.error('a maximum of two arguments are allowed')
716     args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
717     
718     if args.bitcoind_rpc_password is None:
719         conf_path = net.PARENT.CONF_FILE_FUNC()
720         if not os.path.exists(conf_path):
721             parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
722                 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
723                 '''\r\n'''
724                 '''server=1\r\n'''
725                 '''rpcpassword=%x\r\n'''
726                 '''\r\n'''
727                 '''Keep that password secret! After creating the file, restart Bitcoin.''' % (conf_path, random.randrange(2**128)))
728         conf = open(conf_path, 'rb').read()
729         contents = {}
730         for line in conf.splitlines(True):
731             if '#' in line:
732                 line = line[:line.index('#')]
733             if '=' not in line:
734                 continue
735             k, v = line.split('=', 1)
736             contents[k.strip()] = v.strip()
737         for conf_name, var_name, var_type in [
738             ('rpcuser', 'bitcoind_rpc_username', str),
739             ('rpcpassword', 'bitcoind_rpc_password', str),
740             ('rpcport', 'bitcoind_rpc_port', int),
741             ('port', 'bitcoind_p2p_port', int),
742         ]:
743             if getattr(args, var_name) is None and conf_name in contents:
744                 setattr(args, var_name, var_type(contents[conf_name]))
745         if args.bitcoind_rpc_password is None:
746             parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
747     
748     if args.bitcoind_rpc_username is None:
749         args.bitcoind_rpc_username = ''
750     
751     if args.bitcoind_rpc_port is None:
752         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
753     
754     if args.bitcoind_p2p_port is None:
755         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
756     
757     if args.p2pool_port is None:
758         args.p2pool_port = net.P2P_PORT
759     
760     if args.worker_endpoint is None:
761         worker_endpoint = '', net.WORKER_PORT
762     elif ':' not in args.worker_endpoint:
763         worker_endpoint = '', int(args.worker_endpoint)
764     else:
765         addr, port = args.worker_endpoint.rsplit(':', 1)
766         worker_endpoint = addr, int(port)
767     
768     if args.address is not None:
769         try:
770             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
771         except Exception, e:
772             parser.error('error parsing address: ' + repr(e))
773     else:
774         args.pubkey_hash = None
775     
776     def separate_url(url):
777         s = urlparse.urlsplit(url)
778         if '@' not in s.netloc:
779             parser.error('merged url netloc must contain an "@"')
780         userpass, new_netloc = s.netloc.rsplit('@', 1)
781         return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
782     merged_urls = map(separate_url, args.merged_urls)
783     
784     if args.logfile is None:
785         args.logfile = os.path.join(datadir_path, 'log')
786     
787     logfile = logging.LogFile(args.logfile)
788     pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
789     sys.stdout = logging.AbortPipe(pipe)
790     sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
791     if hasattr(signal, "SIGUSR1"):
792         def sigusr1(signum, frame):
793             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
794             logfile.reopen()
795             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
796         signal.signal(signal.SIGUSR1, sigusr1)
797     task.LoopingCall(logfile.reopen).start(5)
798     
799     class ErrorReporter(object):
800         def __init__(self):
801             self.last_sent = None
802         
803         def emit(self, eventDict):
804             if not eventDict["isError"]:
805                 return
806             
807             if self.last_sent is not None and time.time() < self.last_sent + 5:
808                 return
809             self.last_sent = time.time()
810             
811             if 'failure' in eventDict:
812                 text = ((eventDict.get('why') or 'Unhandled Error')
813                     + '\n' + eventDict['failure'].getTraceback())
814             else:
815                 text = " ".join([str(m) for m in eventDict["message"]]) + "\n"
816             
817             from twisted.web import client
818             client.getPage(
819                 url='http://u.forre.st/p2pool_error.cgi',
820                 method='POST',
821                 postdata=p2pool.__version__ + ' ' + net.NAME + '\n' + text,
822                 timeout=15,
823             ).addBoth(lambda x: None)
824     if not args.no_bugreport:
825         log.addObserver(ErrorReporter().emit)
826     
827     reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)
828     reactor.run()