Merge branch 'tx_preforwarding'
[p2pool.git] / p2pool / main.py
1 from __future__ import division
2
3 import base64
4 import json
5 import os
6 import random
7 import sys
8 import time
9 import signal
10 import traceback
11 import urlparse
12
13 if '--iocp' in sys.argv:
14     from twisted.internet import iocpreactor
15     iocpreactor.install()
16 from twisted.internet import defer, reactor, protocol, task
17 from twisted.web import server
18 from twisted.python import log
19 from nattraverso import portmapper, ipdiscover
20
21 import bitcoin.p2p as bitcoin_p2p, bitcoin.data as bitcoin_data
22 from bitcoin import worker_interface, height_tracker
23 from util import fixargparse, jsonrpc, variable, deferral, math, logging
24 from . import p2p, networks, web, work
25 import p2pool, p2pool.data as p2pool_data
26
27 @deferral.retry('Error getting work from bitcoind:', 3)
28 @defer.inlineCallbacks
29 def getwork(bitcoind, use_getblocktemplate=False):
30     def go():
31         if use_getblocktemplate:
32             return bitcoind.rpc_getblocktemplate(dict(mode='template'))
33         else:
34             return bitcoind.rpc_getmemorypool()
35     try:
36         work = yield go()
37     except jsonrpc.Error_for_code(-32601): # Method not found
38         use_getblocktemplate = not use_getblocktemplate
39         try:
40             work = yield go()
41         except jsonrpc.Error_for_code(-32601): # Method not found
42             print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
43             raise deferral.RetrySilentlyException()
44     packed_transactions = [(x['data'] if isinstance(x, dict) else x).decode('hex') for x in work['transactions']]
45     if 'height' not in work:
46         work['height'] = (yield bitcoind.rpc_getblock(work['previousblockhash']))['height'] + 1
47     elif p2pool.DEBUG:
48         assert work['height'] == (yield bitcoind.rpc_getblock(work['previousblockhash']))['height'] + 1
49     defer.returnValue(dict(
50         version=work['version'],
51         previous_block=int(work['previousblockhash'], 16),
52         transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
53         transaction_hashes=map(bitcoin_data.hash256, packed_transactions),
54         subsidy=work['coinbasevalue'],
55         time=work['time'] if 'time' in work else work['curtime'],
56         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
57         coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
58         height=work['height'],
59         last_update=time.time(),
60         use_getblocktemplate=use_getblocktemplate,
61     ))
62
63 @defer.inlineCallbacks
64 def main(args, net, datadir_path, merged_urls, worker_endpoint):
65     try:
66         print 'p2pool (version %s)' % (p2pool.__version__,)
67         print
68         
69         traffic_happened = variable.Event()
70         
71         @defer.inlineCallbacks
72         def connect_p2p():
73             # connect to bitcoind over bitcoin-p2p
74             print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
75             factory = bitcoin_p2p.ClientFactory(net.PARENT)
76             reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
77             yield factory.getProtocol() # waits until handshake is successful
78             print '    ...success!'
79             print
80             defer.returnValue(factory)
81         
82         if args.testnet: # establish p2p connection first if testnet so bitcoind can work without connections
83             factory = yield connect_p2p()
84         
85         # connect to bitcoind over JSON-RPC and do initial getmemorypool
86         url = '%s://%s:%i/' % ('https' if args.bitcoind_rpc_ssl else 'http', args.bitcoind_address, args.bitcoind_rpc_port)
87         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
88         bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
89         @deferral.retry('Error while checking Bitcoin connection:', 1)
90         @defer.inlineCallbacks
91         def check():
92             if not (yield net.PARENT.RPC_CHECK(bitcoind)):
93                 print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
94                 raise deferral.RetrySilentlyException()
95             if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version']):
96                 print >>sys.stderr, '    Bitcoin version too old! Upgrade to 0.6.4 or newer!'
97                 raise deferral.RetrySilentlyException()
98         yield check()
99         temp_work = yield getwork(bitcoind)
100         
101         if not args.testnet:
102             factory = yield connect_p2p()
103         
104         block_height_var = variable.Variable(None)
105         @defer.inlineCallbacks
106         def poll_height():
107             block_height_var.set((yield deferral.retry('Error while calling getblockcount:')(bitcoind.rpc_getblockcount)()))
108         yield poll_height()
109         task.LoopingCall(poll_height).start(60*60)
110         
111         bitcoind_warning_var = variable.Variable(None)
112         @defer.inlineCallbacks
113         def poll_warnings():
114             errors = (yield deferral.retry('Error while calling getmininginfo:')(bitcoind.rpc_getmininginfo)())['errors']
115             bitcoind_warning_var.set(errors if errors != '' else None)
116         yield poll_warnings()
117         task.LoopingCall(poll_warnings).start(20*60)
118         
119         print '    ...success!'
120         print '    Current block hash: %x' % (temp_work['previous_block'],)
121         print '    Current block height: %i' % (block_height_var.value,)
122         print
123         
124         print 'Determining payout address...'
125         if args.pubkey_hash is None:
126             address_path = os.path.join(datadir_path, 'cached_payout_address')
127             
128             if os.path.exists(address_path):
129                 with open(address_path, 'rb') as f:
130                     address = f.read().strip('\r\n')
131                 print '    Loaded cached address: %s...' % (address,)
132             else:
133                 address = None
134             
135             if address is not None:
136                 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
137                 if not res['isvalid'] or not res['ismine']:
138                     print '    Cached address is either invalid or not controlled by local bitcoind!'
139                     address = None
140             
141             if address is None:
142                 print '    Getting payout address from bitcoind...'
143                 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
144             
145             with open(address_path, 'wb') as f:
146                 f.write(address)
147             
148             my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
149         else:
150             my_pubkey_hash = args.pubkey_hash
151         print '    ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
152         print
153         
154         my_share_hashes = set()
155         my_doa_share_hashes = set()
156         
157         tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
158         shared_share_hashes = set()
159         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
160         known_verified = set()
161         print "Loading shares..."
162         for i, (mode, contents) in enumerate(ss.get_shares()):
163             if mode == 'share':
164                 if contents.hash in tracker.items:
165                     continue
166                 shared_share_hashes.add(contents.hash)
167                 contents.time_seen = 0
168                 tracker.add(contents)
169                 if len(tracker.items) % 1000 == 0 and tracker.items:
170                     print "    %i" % (len(tracker.items),)
171             elif mode == 'verified_hash':
172                 known_verified.add(contents)
173             else:
174                 raise AssertionError()
175         print "    ...inserting %i verified shares..." % (len(known_verified),)
176         for h in known_verified:
177             if h not in tracker.items:
178                 ss.forget_verified_share(h)
179                 continue
180             tracker.verified.add(tracker.items[h])
181         print "    ...done loading %i shares!" % (len(tracker.items),)
182         print
183         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
184         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
185         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
186         
187         print 'Initializing work...'
188         
189         
190         # BITCOIND WORK
191         
192         bitcoind_work = variable.Variable((yield getwork(bitcoind)))
193         @defer.inlineCallbacks
194         def work_poller():
195             while True:
196                 flag = factory.new_block.get_deferred()
197                 try:
198                     bitcoind_work.set((yield getwork(bitcoind, bitcoind_work.value['use_getblocktemplate'])))
199                 except:
200                     log.err()
201                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
202         work_poller()
203         
204         # PEER WORK
205         
206         best_block_header = variable.Variable(None)
207         def handle_header(new_header):
208             # check that header matches current target
209             if not (net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(new_header)) <= bitcoind_work.value['bits'].target):
210                 return
211             bitcoind_best_block = bitcoind_work.value['previous_block']
212             if (best_block_header.value is None
213                 or (
214                     new_header['previous_block'] == bitcoind_best_block and
215                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)) == bitcoind_best_block
216                 ) # new is child of current and previous is current
217                 or (
218                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(new_header)) == bitcoind_best_block and
219                     best_block_header.value['previous_block'] != bitcoind_best_block
220                 )): # new is current and previous is not a child of current
221                 best_block_header.set(new_header)
222         @defer.inlineCallbacks
223         def poll_header():
224             handle_header((yield factory.conn.value.get_block_header(bitcoind_work.value['previous_block'])))
225         bitcoind_work.changed.watch(lambda _: poll_header())
226         yield deferral.retry('Error while requesting best block header:')(poll_header)()
227         
228         # BEST SHARE
229         
230         known_txs_var = variable.Variable({}) # hash -> tx
231         mining_txs_var = variable.Variable({}) # hash -> tx
232         get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: bitcoind_work.value['previous_block'], net)
233         
234         best_share_var = variable.Variable(None)
235         desired_var = variable.Variable(None)
236         def set_best_share():
237             best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'], known_txs_var.value)
238             
239             best_share_var.set(best)
240             desired_var.set(desired)
241         bitcoind_work.changed.watch(lambda _: set_best_share())
242         set_best_share()
243         
244         print '    ...success!'
245         print
246         
247         # setup p2p logic and join p2pool network
248         
249         # update mining_txs according to getwork results
250         @bitcoind_work.changed.run_and_watch
251         def _(_=None):
252             new_mining_txs = {}
253             new_known_txs = dict(known_txs_var.value)
254             for tx_hash, tx in zip(bitcoind_work.value['transaction_hashes'], bitcoind_work.value['transactions']):
255                 new_mining_txs[tx_hash] = tx
256                 new_known_txs[tx_hash] = tx
257             mining_txs_var.set(new_mining_txs)
258             known_txs_var.set(new_known_txs)
259         # add p2p transactions from bitcoind to known_txs
260         @factory.new_tx.watch
261         def _(tx):
262             new_known_txs = dict(known_txs_var.value)
263             new_known_txs[bitcoin_data.hash256(bitcoin_data.tx_type.pack(tx))] = tx
264             known_txs_var.set(new_known_txs)
265         # forward transactions seen to bitcoind
266         @known_txs_var.transitioned.watch
267         @defer.inlineCallbacks
268         def _(before, after):
269             yield deferral.sleep(random.expovariate(1/1))
270             for tx_hash in set(after) - set(before):
271                 factory.conn.value.send_tx(tx=after[tx_hash])
272         
273         class Node(p2p.Node):
274             def handle_shares(self, shares, peer):
275                 if len(shares) > 5:
276                     print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
277                 
278                 new_count = 0
279                 for share in shares:
280                     if share.hash in tracker.items:
281                         #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
282                         continue
283                     
284                     new_count += 1
285                     
286                     #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
287                     
288                     tracker.add(share)
289                 
290                 if new_count:
291                     set_best_share()
292                 
293                 if len(shares) > 5:
294                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.items), 2*net.CHAIN_LENGTH)
295             
296             @defer.inlineCallbacks
297             def handle_share_hashes(self, hashes, peer):
298                 new_hashes = [x for x in hashes if x not in tracker.items]
299                 if not new_hashes:
300                     return
301                 try:
302                     shares = yield peer.get_shares(
303                         hashes=new_hashes,
304                         parents=0,
305                         stops=[],
306                     )
307                 except:
308                     log.err(None, 'in handle_share_hashes:')
309                 else:
310                     self.handle_shares(shares, peer)
311             
312             def handle_get_shares(self, hashes, parents, stops, peer):
313                 parents = min(parents, 1000//len(hashes))
314                 stops = set(stops)
315                 shares = []
316                 for share_hash in hashes:
317                     for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
318                         if share.hash in stops:
319                             break
320                         shares.append(share)
321                 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
322                 return shares
323             
324             def handle_bestblock(self, header, peer):
325                 if net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header)) > header['bits'].target:
326                     raise p2p.PeerMisbehavingError('received block header fails PoW test')
327                 handle_header(header)
328         
329         @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
330         def submit_block_p2p(block):
331             if factory.conn.value is None:
332                 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, bitcoin_data.hash256(bitcoin_data.block_header_type.pack(block['header'])))
333                 raise deferral.RetrySilentlyException()
334             factory.conn.value.send_block(block=block)
335         
336         @deferral.retry('Error submitting block: (will retry)', 10, 10)
337         @defer.inlineCallbacks
338         def submit_block_rpc(block, ignore_failure):
339             if bitcoind_work.value['use_getblocktemplate']:
340                 result = yield bitcoind.rpc_submitblock(bitcoin_data.block_type.pack(block).encode('hex'))
341                 success = result is None
342             else:
343                 result = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
344                 success = result
345             success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
346             if (not success and success_expected and not ignore_failure) or (success and not success_expected):
347                 print >>sys.stderr, 'Block submittal result: %s (%r) Expected: %s' % (success, result, success_expected)
348         
349         def submit_block(block, ignore_failure):
350             submit_block_p2p(block)
351             submit_block_rpc(block, ignore_failure)
352         
353         @tracker.verified.added.watch
354         def _(share):
355             if share.pow_hash <= share.header['bits'].target:
356                 block = share.as_block(tracker, known_txs_var.value)
357                 if block is None:
358                     print >>sys.stderr, 'GOT INCOMPLETE BLOCK FROM PEER! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
359                     return
360                 submit_block(block, ignore_failure=True)
361                 print
362                 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
363                 print
364                 def spread():
365                     if (get_height_rel_highest(share.header['previous_block']) > -5 or
366                         bitcoind_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]):
367                         broadcast_share(share.hash)
368                 spread()
369                 reactor.callLater(5, spread) # so get_height_rel_highest can update
370         
371         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
372         
373         @defer.inlineCallbacks
374         def parse(x):
375             if ':' in x:
376                 ip, port = x.split(':')
377                 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
378             else:
379                 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
380         
381         addrs = {}
382         if os.path.exists(os.path.join(datadir_path, 'addrs')):
383             try:
384                 with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
385                     addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
386             except:
387                 print >>sys.stderr, 'error parsing addrs'
388         for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
389             try:
390                 addr = yield addr_df
391                 if addr not in addrs:
392                     addrs[addr] = (0, time.time(), time.time())
393             except:
394                 log.err()
395         
396         connect_addrs = set()
397         for addr_df in map(parse, args.p2pool_nodes):
398             try:
399                 connect_addrs.add((yield addr_df))
400             except:
401                 log.err()
402         
403         p2p_node = Node(
404             best_share_hash_func=lambda: best_share_var.value,
405             port=args.p2pool_port,
406             net=net,
407             addr_store=addrs,
408             connect_addrs=connect_addrs,
409             max_incoming_conns=args.p2pool_conns,
410             traffic_happened=traffic_happened,
411             known_txs_var=known_txs_var,
412             mining_txs_var=mining_txs_var,
413         )
414         p2p_node.start()
415         
416         def forget_old_txs():
417             new_known_txs = {}
418             for peer in p2p_node.peers.itervalues():
419                 new_known_txs.update(peer.remembered_txs)
420             new_known_txs.update(mining_txs_var.value)
421             for share in tracker.get_chain(best_share_var.value, min(120, tracker.get_height(best_share_var.value))):
422                 for tx_hash in share.new_transaction_hashes:
423                     if tx_hash in known_txs_var.value:
424                         new_known_txs[tx_hash] = known_txs_var.value[tx_hash]
425             known_txs_var.set(new_known_txs)
426         task.LoopingCall(forget_old_txs).start(10)
427         
428         def save_addrs():
429             with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
430                 f.write(json.dumps(p2p_node.addr_store.items()))
431         task.LoopingCall(save_addrs).start(60)
432         
433         @best_block_header.changed.watch
434         def _(header):
435             for peer in p2p_node.peers.itervalues():
436                 peer.send_bestblock(header=header)
437         
438         @defer.inlineCallbacks
439         def broadcast_share(share_hash):
440             shares = []
441             for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
442                 if share.hash in shared_share_hashes:
443                     break
444                 shared_share_hashes.add(share.hash)
445                 shares.append(share)
446             
447             for peer in list(p2p_node.peers.itervalues()):
448                 yield peer.sendShares([share for share in shares if share.peer is not peer], tracker, known_txs_var.value, include_txs_with=[share_hash])
449         
450         # send share when the chain changes to their chain
451         best_share_var.changed.watch(broadcast_share)
452         
453         def save_shares():
454             for share in tracker.get_chain(best_share_var.value, min(tracker.get_height(best_share_var.value), 2*net.CHAIN_LENGTH)):
455                 ss.add_share(share)
456                 if share.hash in tracker.verified.items:
457                     ss.add_verified_hash(share.hash)
458         task.LoopingCall(save_shares).start(60)
459         
460         @apply
461         @defer.inlineCallbacks
462         def download_shares():
463             while True:
464                 desired = yield desired_var.get_when_satisfies(lambda val: len(val) != 0)
465                 peer2, share_hash = random.choice(desired)
466                 
467                 if len(p2p_node.peers) == 0:
468                     yield deferral.sleep(1)
469                     continue
470                 peer = random.choice(p2p_node.peers.values())
471                 
472                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
473                 try:
474                     shares = yield peer.get_shares(
475                         hashes=[share_hash],
476                         parents=500,
477                         stops=[],
478                     )
479                 except:
480                     log.err(None, 'in download_shares:')
481                     continue
482                 
483                 if not shares:
484                     yield deferral.sleep(1) # sleep so we don't keep rerequesting the same share nobody has
485                     continue
486                 p2p_node.handle_shares(shares, peer)
487         
488         print '    ...success!'
489         print
490         
491         if args.upnp:
492             @defer.inlineCallbacks
493             def upnp_thread():
494                 while True:
495                     try:
496                         is_lan, lan_ip = yield ipdiscover.get_local_ip()
497                         if is_lan:
498                             pm = yield portmapper.get_port_mapper()
499                             yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
500                     except defer.TimeoutError:
501                         pass
502                     except:
503                         if p2pool.DEBUG:
504                             log.err(None, 'UPnP error:')
505                     yield deferral.sleep(random.expovariate(1/120))
506             upnp_thread()
507         
508         # start listening for workers with a JSON-RPC server
509         
510         print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
511         
512         get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, best_share_var.value, bitcoind_work.value['bits'].target, bitcoind_work.value['subsidy'], net)
513         
514         wb = work.WorkerBridge(my_pubkey_hash, net, args.donation_percentage, bitcoind_work, best_block_header, merged_urls, best_share_var, tracker, my_share_hashes, my_doa_share_hashes, args.worker_fee, p2p_node, submit_block, set_best_share, broadcast_share, block_height_var)
515         web_root = web.get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net, wb.get_stale_counts, my_pubkey_hash, wb.local_rate_monitor, args.worker_fee, p2p_node, wb.my_share_hashes, wb.pseudoshare_received, wb.share_received, best_share_var, bitcoind_warning_var, traffic_happened, args.donation_percentage)
516         worker_interface.WorkerInterface(wb).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
517         
518         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
519         
520         with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
521             pass
522         
523         print '    ...success!'
524         print
525         
526         
527         # done!
528         print 'Started successfully!'
529         print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
530         if args.donation_percentage > 0.51:
531             print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
532         elif args.donation_percentage < 0.49:
533             print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
534         else:
535             print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
536             print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
537         print
538         
539         
540         if hasattr(signal, 'SIGALRM'):
541             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
542                 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
543             ))
544             signal.siginterrupt(signal.SIGALRM, False)
545             task.LoopingCall(signal.alarm, 30).start(1)
546         
547         if args.irc_announce:
548             from twisted.words.protocols import irc
549             class IRCClient(irc.IRCClient):
550                 nickname = 'p2pool%02i' % (random.randrange(100),)
551                 channel = net.ANNOUNCE_CHANNEL
552                 def lineReceived(self, line):
553                     if p2pool.DEBUG:
554                         print repr(line)
555                     irc.IRCClient.lineReceived(self, line)
556                 def signedOn(self):
557                     self.in_channel = False
558                     irc.IRCClient.signedOn(self)
559                     self.factory.resetDelay()
560                     self.join(self.channel)
561                     @defer.inlineCallbacks
562                     def new_share(share):
563                         if not self.in_channel:
564                             return
565                         if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
566                             yield deferral.sleep(random.expovariate(1/60))
567                             message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
568                             if all('%x' % (share.header_hash,) not in old_message for old_message in self.recent_messages):
569                                 self.say(self.channel, message)
570                                 self._remember_message(message)
571                     self.watch_id = tracker.verified.added.watch(new_share)
572                     self.recent_messages = []
573                 def joined(self, channel):
574                     self.in_channel = True
575                 def left(self, channel):
576                     self.in_channel = False
577                 def _remember_message(self, message):
578                     self.recent_messages.append(message)
579                     while len(self.recent_messages) > 100:
580                         self.recent_messages.pop(0)
581                 def privmsg(self, user, channel, message):
582                     if channel == self.channel:
583                         self._remember_message(message)
584                 def connectionLost(self, reason):
585                     tracker.verified.added.unwatch(self.watch_id)
586                     print 'IRC connection lost:', reason.getErrorMessage()
587             class IRCClientFactory(protocol.ReconnectingClientFactory):
588                 protocol = IRCClient
589             reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
590         
591         @defer.inlineCallbacks
592         def status_thread():
593             last_str = None
594             last_time = 0
595             while True:
596                 yield deferral.sleep(3)
597                 try:
598                     height = tracker.get_height(best_share_var.value)
599                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
600                         height,
601                         len(tracker.verified.items),
602                         len(tracker.items),
603                         len(p2p_node.peers),
604                         sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
605                     ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
606                     
607                     datums, dt = wb.local_rate_monitor.get_datums_in_last()
608                     my_att_s = sum(datum['work']/dt for datum in datums)
609                     this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
610                         math.format(int(my_att_s)),
611                         math.format_dt(dt),
612                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
613                         math.format_dt(2**256 / tracker.items[best_share_var.value].max_target / my_att_s) if my_att_s and best_share_var.value else '???',
614                     )
615                     
616                     if height > 2:
617                         (stale_orphan_shares, stale_doa_shares), shares, _ = wb.get_stale_counts()
618                         stale_prop = p2pool_data.get_average_stale_prop(tracker, best_share_var.value, min(60*60//net.SHARE_PERIOD, height))
619                         real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, best_share_var.value, min(height - 1, 60*60//net.SHARE_PERIOD)) / (1 - stale_prop)
620                         
621                         this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
622                             shares, stale_orphan_shares, stale_doa_shares,
623                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
624                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
625                             get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
626                         )
627                         this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
628                             math.format(int(real_att_s)),
629                             100*stale_prop,
630                             math.format_dt(2**256 / bitcoind_work.value['bits'].target / real_att_s),
631                         )
632                         
633                         for warning in p2pool_data.get_warnings(tracker, best_share_var.value, net, bitcoind_warning_var.value, bitcoind_work.value):
634                             print >>sys.stderr, '#'*40
635                             print >>sys.stderr, '>>> Warning: ' + warning
636                             print >>sys.stderr, '#'*40
637                     
638                     if this_str != last_str or time.time() > last_time + 15:
639                         print this_str
640                         last_str = this_str
641                         last_time = time.time()
642                 except:
643                     log.err()
644         status_thread()
645     except:
646         reactor.stop()
647         log.err(None, 'Fatal error:')
648
649 def run():
650     realnets = dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
651     
652     parser = fixargparse.FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
653     parser.add_argument('--version', action='version', version=p2pool.__version__)
654     parser.add_argument('--net',
655         help='use specified network (default: bitcoin)',
656         action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
657     parser.add_argument('--testnet',
658         help='''use the network's testnet''',
659         action='store_const', const=True, default=False, dest='testnet')
660     parser.add_argument('--debug',
661         help='enable debugging mode',
662         action='store_const', const=True, default=False, dest='debug')
663     parser.add_argument('-a', '--address',
664         help='generate payouts to this address (default: <address requested from bitcoind>)',
665         type=str, action='store', default=None, dest='address')
666     parser.add_argument('--datadir',
667         help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
668         type=str, action='store', default=None, dest='datadir')
669     parser.add_argument('--logfile',
670         help='''log to this file (default: data/<NET>/log)''',
671         type=str, action='store', default=None, dest='logfile')
672     parser.add_argument('--merged',
673         help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
674         type=str, action='append', default=[], dest='merged_urls')
675     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
676         help='donate this percentage of work towards the development of p2pool (default: 0.5)',
677         type=float, action='store', default=0.5, dest='donation_percentage')
678     parser.add_argument('--iocp',
679         help='use Windows IOCP API in order to avoid errors due to large number of sockets being open',
680         action='store_true', default=False, dest='iocp')
681     parser.add_argument('--irc-announce',
682         help='announce any blocks found on irc://irc.freenode.net/#p2pool',
683         action='store_true', default=False, dest='irc_announce')
684     parser.add_argument('--no-bugreport',
685         help='disable submitting caught exceptions to the author',
686         action='store_true', default=False, dest='no_bugreport')
687     
688     p2pool_group = parser.add_argument_group('p2pool interface')
689     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
690         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
691         type=int, action='store', default=None, dest='p2pool_port')
692     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
693         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
694         type=str, action='append', default=[], dest='p2pool_nodes')
695     parser.add_argument('--disable-upnp',
696         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
697         action='store_false', default=True, dest='upnp')
698     p2pool_group.add_argument('--max-conns', metavar='CONNS',
699         help='maximum incoming connections (default: 40)',
700         type=int, action='store', default=40, dest='p2pool_conns')
701     
702     worker_group = parser.add_argument_group('worker interface')
703     worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
704         help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
705         type=str, action='store', default=None, dest='worker_endpoint')
706     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
707         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
708         type=float, action='store', default=0, dest='worker_fee')
709     
710     bitcoind_group = parser.add_argument_group('bitcoind interface')
711     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
712         help='connect to this address (default: 127.0.0.1)',
713         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
714     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
715         help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
716         type=int, action='store', default=None, dest='bitcoind_rpc_port')
717     bitcoind_group.add_argument('--bitcoind-rpc-ssl',
718         help='connect to JSON-RPC interface using SSL',
719         action='store_true', default=False, dest='bitcoind_rpc_ssl')
720     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
721         help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
722         type=int, action='store', default=None, dest='bitcoind_p2p_port')
723     
724     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
725         help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
726         type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
727     
728     args = parser.parse_args()
729     
730     if args.debug:
731         p2pool.DEBUG = True
732         defer.setDebugging(True)
733     
734     net_name = args.net_name + ('_testnet' if args.testnet else '')
735     net = networks.nets[net_name]
736     
737     datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
738     if not os.path.exists(datadir_path):
739         os.makedirs(datadir_path)
740     
741     if len(args.bitcoind_rpc_userpass) > 2:
742         parser.error('a maximum of two arguments are allowed')
743     args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
744     
745     if args.bitcoind_rpc_password is None:
746         conf_path = net.PARENT.CONF_FILE_FUNC()
747         if not os.path.exists(conf_path):
748             parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
749                 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
750                 '''\r\n'''
751                 '''server=1\r\n'''
752                 '''rpcpassword=%x\r\n'''
753                 '''\r\n'''
754                 '''Keep that password secret! After creating the file, restart Bitcoin.''' % (conf_path, random.randrange(2**128)))
755         conf = open(conf_path, 'rb').read()
756         contents = {}
757         for line in conf.splitlines(True):
758             if '#' in line:
759                 line = line[:line.index('#')]
760             if '=' not in line:
761                 continue
762             k, v = line.split('=', 1)
763             contents[k.strip()] = v.strip()
764         for conf_name, var_name, var_type in [
765             ('rpcuser', 'bitcoind_rpc_username', str),
766             ('rpcpassword', 'bitcoind_rpc_password', str),
767             ('rpcport', 'bitcoind_rpc_port', int),
768             ('port', 'bitcoind_p2p_port', int),
769         ]:
770             if getattr(args, var_name) is None and conf_name in contents:
771                 setattr(args, var_name, var_type(contents[conf_name]))
772         if args.bitcoind_rpc_password is None:
773             parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
774     
775     if args.bitcoind_rpc_username is None:
776         args.bitcoind_rpc_username = ''
777     
778     if args.bitcoind_rpc_port is None:
779         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
780     
781     if args.bitcoind_p2p_port is None:
782         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
783     
784     if args.p2pool_port is None:
785         args.p2pool_port = net.P2P_PORT
786     
787     if args.worker_endpoint is None:
788         worker_endpoint = '', net.WORKER_PORT
789     elif ':' not in args.worker_endpoint:
790         worker_endpoint = '', int(args.worker_endpoint)
791     else:
792         addr, port = args.worker_endpoint.rsplit(':', 1)
793         worker_endpoint = addr, int(port)
794     
795     if args.address is not None:
796         try:
797             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
798         except Exception, e:
799             parser.error('error parsing address: ' + repr(e))
800     else:
801         args.pubkey_hash = None
802     
803     def separate_url(url):
804         s = urlparse.urlsplit(url)
805         if '@' not in s.netloc:
806             parser.error('merged url netloc must contain an "@"')
807         userpass, new_netloc = s.netloc.rsplit('@', 1)
808         return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
809     merged_urls = map(separate_url, args.merged_urls)
810     
811     if args.logfile is None:
812         args.logfile = os.path.join(datadir_path, 'log')
813     
814     logfile = logging.LogFile(args.logfile)
815     pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
816     sys.stdout = logging.AbortPipe(pipe)
817     sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
818     if hasattr(signal, "SIGUSR1"):
819         def sigusr1(signum, frame):
820             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
821             logfile.reopen()
822             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
823         signal.signal(signal.SIGUSR1, sigusr1)
824     task.LoopingCall(logfile.reopen).start(5)
825     
826     class ErrorReporter(object):
827         def __init__(self):
828             self.last_sent = None
829         
830         def emit(self, eventDict):
831             if not eventDict["isError"]:
832                 return
833             
834             if self.last_sent is not None and time.time() < self.last_sent + 5:
835                 return
836             self.last_sent = time.time()
837             
838             if 'failure' in eventDict:
839                 text = ((eventDict.get('why') or 'Unhandled Error')
840                     + '\n' + eventDict['failure'].getTraceback())
841             else:
842                 text = " ".join([str(m) for m in eventDict["message"]]) + "\n"
843             
844             from twisted.web import client
845             client.getPage(
846                 url='http://u.forre.st/p2pool_error.cgi',
847                 method='POST',
848                 postdata=p2pool.__version__ + ' ' + net.NAME + '\n' + text,
849                 timeout=15,
850             ).addBoth(lambda x: None)
851     if not args.no_bugreport:
852         log.addObserver(ErrorReporter().emit)
853     
854     reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)
855     reactor.run()