efficiency improvement - pause before passing transactions to bitcoind
[p2pool.git] / p2pool / main.py
1 from __future__ import division
2
3 import base64
4 import json
5 import os
6 import random
7 import sys
8 import time
9 import signal
10 import traceback
11 import urlparse
12
13 if '--iocp' in sys.argv:
14     from twisted.internet import iocpreactor
15     iocpreactor.install()
16 from twisted.internet import defer, reactor, protocol, task
17 from twisted.web import server
18 from twisted.python import log
19 from nattraverso import portmapper, ipdiscover
20
21 import bitcoin.p2p as bitcoin_p2p, bitcoin.data as bitcoin_data
22 from bitcoin import worker_interface, height_tracker
23 from util import fixargparse, jsonrpc, variable, deferral, math, logging
24 from . import p2p, networks, web, work
25 import p2pool, p2pool.data as p2pool_data
26
27 @deferral.retry('Error getting work from bitcoind:', 3)
28 @defer.inlineCallbacks
29 def getwork(bitcoind, use_getblocktemplate=False):
30     def go():
31         if use_getblocktemplate:
32             return bitcoind.rpc_getblocktemplate(dict(mode='template'))
33         else:
34             return bitcoind.rpc_getmemorypool()
35     try:
36         work = yield go()
37     except jsonrpc.Error_for_code(-32601): # Method not found
38         use_getblocktemplate = not use_getblocktemplate
39         try:
40             work = yield go()
41         except jsonrpc.Error_for_code(-32601): # Method not found
42             print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
43             raise deferral.RetrySilentlyException()
44     packed_transactions = [(x['data'] if isinstance(x, dict) else x).decode('hex') for x in work['transactions']]
45     if 'height' not in work:
46         work['height'] = (yield bitcoind.rpc_getblock(work['previousblockhash']))['height'] + 1
47     elif p2pool.DEBUG:
48         assert work['height'] == (yield bitcoind.rpc_getblock(work['previousblockhash']))['height'] + 1
49     defer.returnValue(dict(
50         version=work['version'],
51         previous_block=int(work['previousblockhash'], 16),
52         transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
53         transaction_hashes=map(bitcoin_data.hash256, packed_transactions),
54         subsidy=work['coinbasevalue'],
55         time=work['time'] if 'time' in work else work['curtime'],
56         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
57         coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
58         height=work['height'],
59         last_update=time.time(),
60         use_getblocktemplate=use_getblocktemplate,
61     ))
62
63 @defer.inlineCallbacks
64 def main(args, net, datadir_path, merged_urls, worker_endpoint):
65     try:
66         print 'p2pool (version %s)' % (p2pool.__version__,)
67         print
68         
69         traffic_happened = variable.Event()
70         
71         @defer.inlineCallbacks
72         def connect_p2p():
73             # connect to bitcoind over bitcoin-p2p
74             print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
75             factory = bitcoin_p2p.ClientFactory(net.PARENT)
76             reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
77             yield factory.getProtocol() # waits until handshake is successful
78             print '    ...success!'
79             print
80             defer.returnValue(factory)
81         
82         if args.testnet: # establish p2p connection first if testnet so bitcoind can work without connections
83             factory = yield connect_p2p()
84         
85         # connect to bitcoind over JSON-RPC and do initial getmemorypool
86         url = '%s://%s:%i/' % ('https' if args.bitcoind_rpc_ssl else 'http', args.bitcoind_address, args.bitcoind_rpc_port)
87         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
88         bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
89         @deferral.retry('Error while checking Bitcoin connection:', 1)
90         @defer.inlineCallbacks
91         def check():
92             if not (yield net.PARENT.RPC_CHECK(bitcoind)):
93                 print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
94                 raise deferral.RetrySilentlyException()
95             if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version']):
96                 print >>sys.stderr, '    Bitcoin version too old! Upgrade to 0.6.4 or newer!'
97                 raise deferral.RetrySilentlyException()
98         yield check()
99         temp_work = yield getwork(bitcoind)
100         
101         if not args.testnet:
102             factory = yield connect_p2p()
103         
104         block_height_var = variable.Variable(None)
105         @defer.inlineCallbacks
106         def poll_height():
107             block_height_var.set((yield deferral.retry('Error while calling getblockcount:')(bitcoind.rpc_getblockcount)()))
108         yield poll_height()
109         task.LoopingCall(poll_height).start(60*60)
110         
111         bitcoind_warning_var = variable.Variable(None)
112         @defer.inlineCallbacks
113         def poll_warnings():
114             errors = (yield deferral.retry('Error while calling getmininginfo:')(bitcoind.rpc_getmininginfo)())['errors']
115             bitcoind_warning_var.set(errors if errors != '' else None)
116         yield poll_warnings()
117         task.LoopingCall(poll_warnings).start(20*60)
118         
119         print '    ...success!'
120         print '    Current block hash: %x' % (temp_work['previous_block'],)
121         print '    Current block height: %i' % (block_height_var.value,)
122         print
123         
124         print 'Determining payout address...'
125         if args.pubkey_hash is None:
126             address_path = os.path.join(datadir_path, 'cached_payout_address')
127             
128             if os.path.exists(address_path):
129                 with open(address_path, 'rb') as f:
130                     address = f.read().strip('\r\n')
131                 print '    Loaded cached address: %s...' % (address,)
132             else:
133                 address = None
134             
135             if address is not None:
136                 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
137                 if not res['isvalid'] or not res['ismine']:
138                     print '    Cached address is either invalid or not controlled by local bitcoind!'
139                     address = None
140             
141             if address is None:
142                 print '    Getting payout address from bitcoind...'
143                 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
144             
145             with open(address_path, 'wb') as f:
146                 f.write(address)
147             
148             my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
149         else:
150             my_pubkey_hash = args.pubkey_hash
151         print '    ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
152         print
153         
154         my_share_hashes = set()
155         my_doa_share_hashes = set()
156         
157         tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
158         shared_share_hashes = set()
159         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
160         known_verified = set()
161         print "Loading shares..."
162         for i, (mode, contents) in enumerate(ss.get_shares()):
163             if mode == 'share':
164                 if contents.hash in tracker.items:
165                     continue
166                 shared_share_hashes.add(contents.hash)
167                 contents.time_seen = 0
168                 tracker.add(contents)
169                 if len(tracker.items) % 1000 == 0 and tracker.items:
170                     print "    %i" % (len(tracker.items),)
171             elif mode == 'verified_hash':
172                 known_verified.add(contents)
173             else:
174                 raise AssertionError()
175         print "    ...inserting %i verified shares..." % (len(known_verified),)
176         for h in known_verified:
177             if h not in tracker.items:
178                 ss.forget_verified_share(h)
179                 continue
180             tracker.verified.add(tracker.items[h])
181         print "    ...done loading %i shares!" % (len(tracker.items),)
182         print
183         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
184         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
185         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
186         
187         print 'Initializing work...'
188         
189         
190         # BITCOIND WORK
191         
192         bitcoind_work = variable.Variable((yield getwork(bitcoind)))
193         @defer.inlineCallbacks
194         def work_poller():
195             while True:
196                 flag = factory.new_block.get_deferred()
197                 try:
198                     bitcoind_work.set((yield getwork(bitcoind, bitcoind_work.value['use_getblocktemplate'])))
199                 except:
200                     log.err()
201                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
202         work_poller()
203         
204         # PEER WORK
205         
206         best_block_header = variable.Variable(None)
207         def handle_header(new_header):
208             # check that header matches current target
209             if not (net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(new_header)) <= bitcoind_work.value['bits'].target):
210                 return
211             bitcoind_best_block = bitcoind_work.value['previous_block']
212             if (best_block_header.value is None
213                 or (
214                     new_header['previous_block'] == bitcoind_best_block and
215                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)) == bitcoind_best_block
216                 ) # new is child of current and previous is current
217                 or (
218                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(new_header)) == bitcoind_best_block and
219                     best_block_header.value['previous_block'] != bitcoind_best_block
220                 )): # new is current and previous is not a child of current
221                 best_block_header.set(new_header)
222         @defer.inlineCallbacks
223         def poll_header():
224             handle_header((yield factory.conn.value.get_block_header(bitcoind_work.value['previous_block'])))
225         bitcoind_work.changed.watch(lambda _: poll_header())
226         yield deferral.retry('Error while requesting best block header:')(poll_header)()
227         
228         # BEST SHARE
229         
230         known_txs_var = variable.Variable({}) # hash -> tx
231         mining_txs_var = variable.Variable({}) # hash -> tx
232         get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: bitcoind_work.value['previous_block'], net)
233         
234         best_share_var = variable.Variable(None)
235         desired_var = variable.Variable(None)
236         def set_best_share():
237             best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'], known_txs_var.value)
238             
239             best_share_var.set(best)
240             desired_var.set(desired)
241         bitcoind_work.changed.watch(lambda _: set_best_share())
242         set_best_share()
243         
244         print '    ...success!'
245         print
246         
247         # setup p2p logic and join p2pool network
248         
249         # update mining_txs according to getwork results
250         @bitcoind_work.changed.run_and_watch
251         def _(_=None):
252             new_mining_txs = {}
253             new_known_txs = dict(known_txs_var.value)
254             for tx_hash, tx in zip(bitcoind_work.value['transaction_hashes'], bitcoind_work.value['transactions']):
255                 new_mining_txs[tx_hash] = tx
256                 new_known_txs[tx_hash] = tx
257             mining_txs_var.set(new_mining_txs)
258             known_txs_var.set(new_known_txs)
259         # add p2p transactions from bitcoind to known_txs
260         @factory.new_tx.watch
261         def _(tx):
262             new_known_txs = dict(known_txs_var.value)
263             new_known_txs[bitcoin_data.hash256(bitcoin_data.tx_type.pack(tx))] = tx
264             known_txs_var.set(new_known_txs)
265         # forward transactions seen to bitcoind
266         @known_txs_var.transitioned.watch
267         @defer.inlineCallbacks
268         def _(before, after):
269             yield deferral.sleep(random.expovariate(1/1))
270             for tx_hash in set(after) - set(before):
271                 factory.conn.value.send_tx(tx=after[tx_hash])
272         
273         class Node(p2p.Node):
274             def handle_shares(self, shares, peer):
275                 if len(shares) > 5:
276                     print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
277                 
278                 new_count = 0
279                 for share in shares:
280                     if share.hash in tracker.items:
281                         #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
282                         continue
283                     
284                     new_count += 1
285                     
286                     #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
287                     
288                     tracker.add(share)
289                 
290                 if new_count:
291                     set_best_share()
292                 
293                 if len(shares) > 5:
294                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.items), 2*net.CHAIN_LENGTH)
295             
296             @defer.inlineCallbacks
297             def handle_share_hashes(self, hashes, peer):
298                 new_hashes = [x for x in hashes if x not in tracker.items]
299                 if not new_hashes:
300                     return
301                 try:
302                     shares = yield peer.get_shares(
303                         hashes=new_hashes,
304                         parents=0,
305                         stops=[],
306                     )
307                 except:
308                     log.err(None, 'in handle_share_hashes:')
309                 else:
310                     self.handle_shares(shares, peer)
311             
312             def handle_get_shares(self, hashes, parents, stops, peer):
313                 parents = min(parents, 1000//len(hashes))
314                 stops = set(stops)
315                 shares = []
316                 for share_hash in hashes:
317                     for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
318                         if share.hash in stops:
319                             break
320                         shares.append(share)
321                 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
322                 return shares
323             
324             def handle_bestblock(self, header, peer):
325                 if net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header)) > header['bits'].target:
326                     raise p2p.PeerMisbehavingError('received block header fails PoW test')
327                 handle_header(header)
328         
329         @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
330         def submit_block_p2p(block):
331             if factory.conn.value is None:
332                 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, bitcoin_data.hash256(bitcoin_data.block_header_type.pack(block['header'])))
333                 raise deferral.RetrySilentlyException()
334             factory.conn.value.send_block(block=block)
335         
336         @deferral.retry('Error submitting block: (will retry)', 10, 10)
337         @defer.inlineCallbacks
338         def submit_block_rpc(block, ignore_failure):
339             if bitcoind_work.value['use_getblocktemplate']:
340                 result = yield bitcoind.rpc_submitblock(bitcoin_data.block_type.pack(block).encode('hex'))
341                 success = result is None
342             else:
343                 result = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
344                 success = result
345             success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
346             if (not success and success_expected and not ignore_failure) or (success and not success_expected):
347                 print >>sys.stderr, 'Block submittal result: %s (%r) Expected: %s' % (success, result, success_expected)
348         
349         def submit_block(block, ignore_failure):
350             submit_block_p2p(block)
351             submit_block_rpc(block, ignore_failure)
352         
353         @tracker.verified.added.watch
354         def _(share):
355             if share.pow_hash <= share.header['bits'].target:
356                 block = share.as_block(tracker, known_txs_var.value)
357                 if block is None:
358                     print >>sys.stderr, 'GOT INCOMPLETE BLOCK FROM PEER! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
359                     return
360                 submit_block(block, ignore_failure=True)
361                 print
362                 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
363                 print
364                 def spread():
365                     if (get_height_rel_highest(share.header['previous_block']) > -5 or
366                         bitcoind_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]):
367                         broadcast_share(share.hash)
368                 spread()
369                 reactor.callLater(5, spread) # so get_height_rel_highest can update
370         
371         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
372         
373         @defer.inlineCallbacks
374         def parse(x):
375             if ':' in x:
376                 ip, port = x.split(':')
377                 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
378             else:
379                 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
380         
381         addrs = {}
382         if os.path.exists(os.path.join(datadir_path, 'addrs')):
383             try:
384                 with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
385                     addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
386             except:
387                 print >>sys.stderr, 'error parsing addrs'
388         for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
389             try:
390                 addr = yield addr_df
391                 if addr not in addrs:
392                     addrs[addr] = (0, time.time(), time.time())
393             except:
394                 log.err()
395         
396         connect_addrs = set()
397         for addr_df in map(parse, args.p2pool_nodes):
398             try:
399                 connect_addrs.add((yield addr_df))
400             except:
401                 log.err()
402         
403         p2p_node = Node(
404             best_share_hash_func=lambda: best_share_var.value,
405             port=args.p2pool_port,
406             net=net,
407             addr_store=addrs,
408             connect_addrs=connect_addrs,
409             max_incoming_conns=args.p2pool_conns,
410             traffic_happened=traffic_happened,
411             known_txs_var=known_txs_var,
412             mining_txs_var=mining_txs_var,
413         )
414         p2p_node.start()
415         
416         def forget_old_txs():
417             new_known_txs = {}
418             for peer in p2p_node.peers.itervalues():
419                 new_known_txs.update(peer.remembered_txs)
420             new_known_txs.update(mining_txs_var.value)
421             for share in tracker.get_chain(best_share_var.value, min(120, tracker.get_height(best_share_var.value))):
422                 for tx_hash in share.new_transaction_hashes:
423                     if tx_hash in known_txs_var.value:
424                         new_known_txs[tx_hash] = known_txs_var.value[tx_hash]
425             known_txs_var.set(new_known_txs)
426         task.LoopingCall(forget_old_txs).start(10)
427         
428         def save_addrs():
429             with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
430                 f.write(json.dumps(p2p_node.addr_store.items()))
431         task.LoopingCall(save_addrs).start(60)
432         
433         @best_block_header.changed.watch
434         def _(header):
435             for peer in p2p_node.peers.itervalues():
436                 peer.send_bestblock(header=header)
437         
438         @defer.inlineCallbacks
439         def broadcast_share(share_hash):
440             shares = []
441             for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
442                 if share.hash in shared_share_hashes:
443                     break
444                 shared_share_hashes.add(share.hash)
445                 shares.append(share)
446             
447             for peer in list(p2p_node.peers.itervalues()):
448                 yield peer.sendShares([share for share in shares if share.peer is not peer], tracker, known_txs_var.value, include_txs_with=[share_hash])
449         
450         # send share when the chain changes to their chain
451         best_share_var.changed.watch(broadcast_share)
452         
453         def save_shares():
454             for share in tracker.get_chain(best_share_var.value, min(tracker.get_height(best_share_var.value), 2*net.CHAIN_LENGTH)):
455                 ss.add_share(share)
456                 if share.hash in tracker.verified.items:
457                     ss.add_verified_hash(share.hash)
458         task.LoopingCall(save_shares).start(60)
459         
460         @apply
461         @defer.inlineCallbacks
462         def download_shares():
463             while True:
464                 desired = yield desired_var.get_when_satisfies(lambda val: len(val) != 0)
465                 peer2, share_hash = random.choice(desired)
466                 
467                 if len(p2p_node.peers) == 0:
468                     yield deferral.sleep(1)
469                     continue
470                 peer = random.choice(p2p_node.peers.values())
471                 
472                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
473                 try:
474                     shares = yield peer.get_shares(
475                         hashes=[share_hash],
476                         parents=500,
477                         stops=[],
478                     )
479                 except:
480                     log.err(None, 'in download_shares:')
481                     continue
482                 
483                 if not shares:
484                     yield deferral.sleep(1) # sleep so we don't keep rerequesting the same share nobody has
485                     continue
486                 p2p_node.handle_shares(shares, peer)
487         
488         print '    ...success!'
489         print
490         
491         if args.upnp:
492             @defer.inlineCallbacks
493             def upnp_thread():
494                 while True:
495                     try:
496                         is_lan, lan_ip = yield ipdiscover.get_local_ip()
497                         if is_lan:
498                             pm = yield portmapper.get_port_mapper()
499                             yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
500                     except defer.TimeoutError:
501                         pass
502                     except:
503                         if p2pool.DEBUG:
504                             log.err(None, 'UPnP error:')
505                     yield deferral.sleep(random.expovariate(1/120))
506             upnp_thread()
507         
508         # start listening for workers with a JSON-RPC server
509         
510         print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
511         
512         get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, best_share_var.value, bitcoind_work.value['bits'].target, bitcoind_work.value['subsidy'], net)
513         
514         wb = work.WorkerBridge(my_pubkey_hash, net, args.donation_percentage, bitcoind_work, best_block_header, merged_urls, best_share_var, tracker, my_share_hashes, my_doa_share_hashes, args.worker_fee, p2p_node, submit_block, set_best_share, broadcast_share, block_height_var)
515         web_root = web.get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net, wb.get_stale_counts, my_pubkey_hash, wb.local_rate_monitor, args.worker_fee, p2p_node, wb.my_share_hashes, wb.pseudoshare_received, wb.share_received, best_share_var, bitcoind_warning_var, traffic_happened)
516         worker_interface.WorkerInterface(wb).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
517         
518         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
519         
520         with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
521             pass
522         
523         print '    ...success!'
524         print
525         
526         
527         # done!
528         print 'Started successfully!'
529         print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
530         if args.donation_percentage > 0.51:
531             print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
532         elif args.donation_percentage < 0.49:
533             print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
534         else:
535             print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
536             print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
537         print
538         
539         
540         if hasattr(signal, 'SIGALRM'):
541             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
542                 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
543             ))
544             signal.siginterrupt(signal.SIGALRM, False)
545             task.LoopingCall(signal.alarm, 30).start(1)
546         
547         if args.irc_announce:
548             from twisted.words.protocols import irc
549             class IRCClient(irc.IRCClient):
550                 nickname = 'p2pool%02i' % (random.randrange(100),)
551                 channel = net.ANNOUNCE_CHANNEL
552                 def lineReceived(self, line):
553                     if p2pool.DEBUG:
554                         print repr(line)
555                     irc.IRCClient.lineReceived(self, line)
556                 def signedOn(self):
557                     irc.IRCClient.signedOn(self)
558                     self.factory.resetDelay()
559                     self.join(self.channel)
560                     @defer.inlineCallbacks
561                     def new_share(share):
562                         if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
563                             yield deferral.sleep(random.expovariate(1/60))
564                             message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
565                             if all('%x' % (share.header_hash,) not in old_message for old_message in self.recent_messages):
566                                 self.say(self.channel, message)
567                                 self._remember_message(message)
568                     self.watch_id = tracker.verified.added.watch(new_share)
569                     self.recent_messages = []
570                 def _remember_message(self, message):
571                     self.recent_messages.append(message)
572                     while len(self.recent_messages) > 100:
573                         self.recent_messages.pop(0)
574                 def privmsg(self, user, channel, message):
575                     if channel == self.channel:
576                         self._remember_message(message)
577                 def connectionLost(self, reason):
578                     tracker.verified.added.unwatch(self.watch_id)
579                     print 'IRC connection lost:', reason.getErrorMessage()
580             class IRCClientFactory(protocol.ReconnectingClientFactory):
581                 protocol = IRCClient
582             reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
583         
584         @defer.inlineCallbacks
585         def status_thread():
586             last_str = None
587             last_time = 0
588             while True:
589                 yield deferral.sleep(3)
590                 try:
591                     height = tracker.get_height(best_share_var.value)
592                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
593                         height,
594                         len(tracker.verified.items),
595                         len(tracker.items),
596                         len(p2p_node.peers),
597                         sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
598                     ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
599                     
600                     datums, dt = wb.local_rate_monitor.get_datums_in_last()
601                     my_att_s = sum(datum['work']/dt for datum in datums)
602                     this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
603                         math.format(int(my_att_s)),
604                         math.format_dt(dt),
605                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
606                         math.format_dt(2**256 / tracker.items[best_share_var.value].max_target / my_att_s) if my_att_s and best_share_var.value else '???',
607                     )
608                     
609                     if height > 2:
610                         (stale_orphan_shares, stale_doa_shares), shares, _ = wb.get_stale_counts()
611                         stale_prop = p2pool_data.get_average_stale_prop(tracker, best_share_var.value, min(60*60//net.SHARE_PERIOD, height))
612                         real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, best_share_var.value, min(height - 1, 60*60//net.SHARE_PERIOD)) / (1 - stale_prop)
613                         
614                         this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
615                             shares, stale_orphan_shares, stale_doa_shares,
616                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
617                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
618                             get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
619                         )
620                         this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
621                             math.format(int(real_att_s)),
622                             100*stale_prop,
623                             math.format_dt(2**256 / bitcoind_work.value['bits'].target / real_att_s),
624                         )
625                         
626                         for warning in p2pool_data.get_warnings(tracker, best_share_var.value, net, bitcoind_warning_var.value, bitcoind_work.value):
627                             print >>sys.stderr, '#'*40
628                             print >>sys.stderr, '>>> Warning: ' + warning
629                             print >>sys.stderr, '#'*40
630                     
631                     if this_str != last_str or time.time() > last_time + 15:
632                         print this_str
633                         last_str = this_str
634                         last_time = time.time()
635                 except:
636                     log.err()
637         status_thread()
638     except:
639         reactor.stop()
640         log.err(None, 'Fatal error:')
641
642 def run():
643     realnets = dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
644     
645     parser = fixargparse.FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
646     parser.add_argument('--version', action='version', version=p2pool.__version__)
647     parser.add_argument('--net',
648         help='use specified network (default: bitcoin)',
649         action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
650     parser.add_argument('--testnet',
651         help='''use the network's testnet''',
652         action='store_const', const=True, default=False, dest='testnet')
653     parser.add_argument('--debug',
654         help='enable debugging mode',
655         action='store_const', const=True, default=False, dest='debug')
656     parser.add_argument('-a', '--address',
657         help='generate payouts to this address (default: <address requested from bitcoind>)',
658         type=str, action='store', default=None, dest='address')
659     parser.add_argument('--datadir',
660         help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
661         type=str, action='store', default=None, dest='datadir')
662     parser.add_argument('--logfile',
663         help='''log to this file (default: data/<NET>/log)''',
664         type=str, action='store', default=None, dest='logfile')
665     parser.add_argument('--merged',
666         help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
667         type=str, action='append', default=[], dest='merged_urls')
668     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
669         help='donate this percentage of work towards the development of p2pool (default: 0.5)',
670         type=float, action='store', default=0.5, dest='donation_percentage')
671     parser.add_argument('--iocp',
672         help='use Windows IOCP API in order to avoid errors due to large number of sockets being open',
673         action='store_true', default=False, dest='iocp')
674     parser.add_argument('--irc-announce',
675         help='announce any blocks found on irc://irc.freenode.net/#p2pool',
676         action='store_true', default=False, dest='irc_announce')
677     parser.add_argument('--no-bugreport',
678         help='disable submitting caught exceptions to the author',
679         action='store_true', default=False, dest='no_bugreport')
680     
681     p2pool_group = parser.add_argument_group('p2pool interface')
682     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
683         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
684         type=int, action='store', default=None, dest='p2pool_port')
685     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
686         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
687         type=str, action='append', default=[], dest='p2pool_nodes')
688     parser.add_argument('--disable-upnp',
689         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
690         action='store_false', default=True, dest='upnp')
691     p2pool_group.add_argument('--max-conns', metavar='CONNS',
692         help='maximum incoming connections (default: 40)',
693         type=int, action='store', default=40, dest='p2pool_conns')
694     
695     worker_group = parser.add_argument_group('worker interface')
696     worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
697         help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
698         type=str, action='store', default=None, dest='worker_endpoint')
699     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
700         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
701         type=float, action='store', default=0, dest='worker_fee')
702     
703     bitcoind_group = parser.add_argument_group('bitcoind interface')
704     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
705         help='connect to this address (default: 127.0.0.1)',
706         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
707     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
708         help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
709         type=int, action='store', default=None, dest='bitcoind_rpc_port')
710     bitcoind_group.add_argument('--bitcoind-rpc-ssl',
711         help='connect to JSON-RPC interface using SSL',
712         action='store_true', default=False, dest='bitcoind_rpc_ssl')
713     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
714         help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
715         type=int, action='store', default=None, dest='bitcoind_p2p_port')
716     
717     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
718         help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
719         type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
720     
721     args = parser.parse_args()
722     
723     if args.debug:
724         p2pool.DEBUG = True
725         defer.setDebugging(True)
726     
727     net_name = args.net_name + ('_testnet' if args.testnet else '')
728     net = networks.nets[net_name]
729     
730     datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
731     if not os.path.exists(datadir_path):
732         os.makedirs(datadir_path)
733     
734     if len(args.bitcoind_rpc_userpass) > 2:
735         parser.error('a maximum of two arguments are allowed')
736     args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
737     
738     if args.bitcoind_rpc_password is None:
739         conf_path = net.PARENT.CONF_FILE_FUNC()
740         if not os.path.exists(conf_path):
741             parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
742                 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
743                 '''\r\n'''
744                 '''server=1\r\n'''
745                 '''rpcpassword=%x\r\n'''
746                 '''\r\n'''
747                 '''Keep that password secret! After creating the file, restart Bitcoin.''' % (conf_path, random.randrange(2**128)))
748         conf = open(conf_path, 'rb').read()
749         contents = {}
750         for line in conf.splitlines(True):
751             if '#' in line:
752                 line = line[:line.index('#')]
753             if '=' not in line:
754                 continue
755             k, v = line.split('=', 1)
756             contents[k.strip()] = v.strip()
757         for conf_name, var_name, var_type in [
758             ('rpcuser', 'bitcoind_rpc_username', str),
759             ('rpcpassword', 'bitcoind_rpc_password', str),
760             ('rpcport', 'bitcoind_rpc_port', int),
761             ('port', 'bitcoind_p2p_port', int),
762         ]:
763             if getattr(args, var_name) is None and conf_name in contents:
764                 setattr(args, var_name, var_type(contents[conf_name]))
765         if args.bitcoind_rpc_password is None:
766             parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
767     
768     if args.bitcoind_rpc_username is None:
769         args.bitcoind_rpc_username = ''
770     
771     if args.bitcoind_rpc_port is None:
772         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
773     
774     if args.bitcoind_p2p_port is None:
775         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
776     
777     if args.p2pool_port is None:
778         args.p2pool_port = net.P2P_PORT
779     
780     if args.worker_endpoint is None:
781         worker_endpoint = '', net.WORKER_PORT
782     elif ':' not in args.worker_endpoint:
783         worker_endpoint = '', int(args.worker_endpoint)
784     else:
785         addr, port = args.worker_endpoint.rsplit(':', 1)
786         worker_endpoint = addr, int(port)
787     
788     if args.address is not None:
789         try:
790             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
791         except Exception, e:
792             parser.error('error parsing address: ' + repr(e))
793     else:
794         args.pubkey_hash = None
795     
796     def separate_url(url):
797         s = urlparse.urlsplit(url)
798         if '@' not in s.netloc:
799             parser.error('merged url netloc must contain an "@"')
800         userpass, new_netloc = s.netloc.rsplit('@', 1)
801         return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
802     merged_urls = map(separate_url, args.merged_urls)
803     
804     if args.logfile is None:
805         args.logfile = os.path.join(datadir_path, 'log')
806     
807     logfile = logging.LogFile(args.logfile)
808     pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
809     sys.stdout = logging.AbortPipe(pipe)
810     sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
811     if hasattr(signal, "SIGUSR1"):
812         def sigusr1(signum, frame):
813             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
814             logfile.reopen()
815             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
816         signal.signal(signal.SIGUSR1, sigusr1)
817     task.LoopingCall(logfile.reopen).start(5)
818     
819     class ErrorReporter(object):
820         def __init__(self):
821             self.last_sent = None
822         
823         def emit(self, eventDict):
824             if not eventDict["isError"]:
825                 return
826             
827             if self.last_sent is not None and time.time() < self.last_sent + 5:
828                 return
829             self.last_sent = time.time()
830             
831             if 'failure' in eventDict:
832                 text = ((eventDict.get('why') or 'Unhandled Error')
833                     + '\n' + eventDict['failure'].getTraceback())
834             else:
835                 text = " ".join([str(m) for m in eventDict["message"]]) + "\n"
836             
837             from twisted.web import client
838             client.getPage(
839                 url='http://u.forre.st/p2pool_error.cgi',
840                 method='POST',
841                 postdata=p2pool.__version__ + ' ' + net.NAME + '\n' + text,
842                 timeout=15,
843             ).addBoth(lambda x: None)
844     if not args.no_bugreport:
845         log.addObserver(ErrorReporter().emit)
846     
847     reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)
848     reactor.run()