efficiency improvement - cache transaction hashes in getwork response object
[p2pool.git] / p2pool / main.py
1 from __future__ import division
2
3 import base64
4 import json
5 import os
6 import random
7 import sys
8 import time
9 import signal
10 import traceback
11 import urlparse
12
13 if '--iocp' in sys.argv:
14     from twisted.internet import iocpreactor
15     iocpreactor.install()
16 from twisted.internet import defer, reactor, protocol, task
17 from twisted.web import server
18 from twisted.python import log
19 from nattraverso import portmapper, ipdiscover
20
21 import bitcoin.p2p as bitcoin_p2p, bitcoin.data as bitcoin_data
22 from bitcoin import worker_interface, height_tracker
23 from util import fixargparse, jsonrpc, variable, deferral, math, logging
24 from . import p2p, networks, web, work
25 import p2pool, p2pool.data as p2pool_data
26
27 @deferral.retry('Error getting work from bitcoind:', 3)
28 @defer.inlineCallbacks
29 def getwork(bitcoind, use_getblocktemplate=False):
30     def go():
31         if use_getblocktemplate:
32             return bitcoind.rpc_getblocktemplate(dict(mode='template'))
33         else:
34             return bitcoind.rpc_getmemorypool()
35     try:
36         work = yield go()
37     except jsonrpc.Error_for_code(-32601): # Method not found
38         use_getblocktemplate = not use_getblocktemplate
39         try:
40             work = yield go()
41         except jsonrpc.Error_for_code(-32601): # Method not found
42             print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
43             raise deferral.RetrySilentlyException()
44     packed_transactions = [(x['data'] if isinstance(x, dict) else x).decode('hex') for x in work['transactions']]
45     if 'height' not in work:
46         work['height'] = (yield bitcoind.rpc_getblock(work['previousblockhash']))['height'] + 1
47     elif p2pool.DEBUG:
48         assert work['height'] == (yield bitcoind.rpc_getblock(work['previousblockhash']))['height'] + 1
49     defer.returnValue(dict(
50         version=work['version'],
51         previous_block=int(work['previousblockhash'], 16),
52         transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
53         transaction_hashes=map(bitcoin_data.hash256, packed_transactions),
54         subsidy=work['coinbasevalue'],
55         time=work['time'] if 'time' in work else work['curtime'],
56         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
57         coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
58         height=work['height'],
59         last_update=time.time(),
60         use_getblocktemplate=use_getblocktemplate,
61     ))
62
63 @defer.inlineCallbacks
64 def main(args, net, datadir_path, merged_urls, worker_endpoint):
65     try:
66         print 'p2pool (version %s)' % (p2pool.__version__,)
67         print
68         
69         traffic_happened = variable.Event()
70         
71         @defer.inlineCallbacks
72         def connect_p2p():
73             # connect to bitcoind over bitcoin-p2p
74             print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
75             factory = bitcoin_p2p.ClientFactory(net.PARENT)
76             reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
77             yield factory.getProtocol() # waits until handshake is successful
78             print '    ...success!'
79             print
80             defer.returnValue(factory)
81         
82         if args.testnet: # establish p2p connection first if testnet so bitcoind can work without connections
83             factory = yield connect_p2p()
84         
85         # connect to bitcoind over JSON-RPC and do initial getmemorypool
86         url = '%s://%s:%i/' % ('https' if args.bitcoind_rpc_ssl else 'http', args.bitcoind_address, args.bitcoind_rpc_port)
87         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
88         bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
89         @deferral.retry('Error while checking Bitcoin connection:', 1)
90         @defer.inlineCallbacks
91         def check():
92             if not (yield net.PARENT.RPC_CHECK(bitcoind)):
93                 print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
94                 raise deferral.RetrySilentlyException()
95             if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version']):
96                 print >>sys.stderr, '    Bitcoin version too old! Upgrade to 0.6.4 or newer!'
97                 raise deferral.RetrySilentlyException()
98         yield check()
99         temp_work = yield getwork(bitcoind)
100         
101         if not args.testnet:
102             factory = yield connect_p2p()
103         
104         block_height_var = variable.Variable(None)
105         @defer.inlineCallbacks
106         def poll_height():
107             block_height_var.set((yield deferral.retry('Error while calling getblockcount:')(bitcoind.rpc_getblockcount)()))
108         yield poll_height()
109         task.LoopingCall(poll_height).start(60*60)
110         
111         bitcoind_warning_var = variable.Variable(None)
112         @defer.inlineCallbacks
113         def poll_warnings():
114             errors = (yield deferral.retry('Error while calling getmininginfo:')(bitcoind.rpc_getmininginfo)())['errors']
115             bitcoind_warning_var.set(errors if errors != '' else None)
116         yield poll_warnings()
117         task.LoopingCall(poll_warnings).start(20*60)
118         
119         print '    ...success!'
120         print '    Current block hash: %x' % (temp_work['previous_block'],)
121         print '    Current block height: %i' % (block_height_var.value,)
122         print
123         
124         print 'Determining payout address...'
125         if args.pubkey_hash is None:
126             address_path = os.path.join(datadir_path, 'cached_payout_address')
127             
128             if os.path.exists(address_path):
129                 with open(address_path, 'rb') as f:
130                     address = f.read().strip('\r\n')
131                 print '    Loaded cached address: %s...' % (address,)
132             else:
133                 address = None
134             
135             if address is not None:
136                 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
137                 if not res['isvalid'] or not res['ismine']:
138                     print '    Cached address is either invalid or not controlled by local bitcoind!'
139                     address = None
140             
141             if address is None:
142                 print '    Getting payout address from bitcoind...'
143                 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
144             
145             with open(address_path, 'wb') as f:
146                 f.write(address)
147             
148             my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
149         else:
150             my_pubkey_hash = args.pubkey_hash
151         print '    ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
152         print
153         
154         my_share_hashes = set()
155         my_doa_share_hashes = set()
156         
157         tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
158         shared_share_hashes = set()
159         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
160         known_verified = set()
161         print "Loading shares..."
162         for i, (mode, contents) in enumerate(ss.get_shares()):
163             if mode == 'share':
164                 if contents.hash in tracker.items:
165                     continue
166                 shared_share_hashes.add(contents.hash)
167                 contents.time_seen = 0
168                 tracker.add(contents)
169                 if len(tracker.items) % 1000 == 0 and tracker.items:
170                     print "    %i" % (len(tracker.items),)
171             elif mode == 'verified_hash':
172                 known_verified.add(contents)
173             else:
174                 raise AssertionError()
175         print "    ...inserting %i verified shares..." % (len(known_verified),)
176         for h in known_verified:
177             if h not in tracker.items:
178                 ss.forget_verified_share(h)
179                 continue
180             tracker.verified.add(tracker.items[h])
181         print "    ...done loading %i shares!" % (len(tracker.items),)
182         print
183         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
184         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
185         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
186         
187         print 'Initializing work...'
188         
189         
190         # BITCOIND WORK
191         
192         bitcoind_work = variable.Variable((yield getwork(bitcoind)))
193         @defer.inlineCallbacks
194         def work_poller():
195             while True:
196                 flag = factory.new_block.get_deferred()
197                 try:
198                     bitcoind_work.set((yield getwork(bitcoind, bitcoind_work.value['use_getblocktemplate'])))
199                 except:
200                     log.err()
201                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
202         work_poller()
203         
204         # PEER WORK
205         
206         best_block_header = variable.Variable(None)
207         def handle_header(new_header):
208             # check that header matches current target
209             if not (net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(new_header)) <= bitcoind_work.value['bits'].target):
210                 return
211             bitcoind_best_block = bitcoind_work.value['previous_block']
212             if (best_block_header.value is None
213                 or (
214                     new_header['previous_block'] == bitcoind_best_block and
215                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)) == bitcoind_best_block
216                 ) # new is child of current and previous is current
217                 or (
218                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(new_header)) == bitcoind_best_block and
219                     best_block_header.value['previous_block'] != bitcoind_best_block
220                 )): # new is current and previous is not a child of current
221                 best_block_header.set(new_header)
222         @defer.inlineCallbacks
223         def poll_header():
224             handle_header((yield factory.conn.value.get_block_header(bitcoind_work.value['previous_block'])))
225         bitcoind_work.changed.watch(lambda _: poll_header())
226         yield deferral.retry('Error while requesting best block header:')(poll_header)()
227         
228         # BEST SHARE
229         
230         known_txs_var = variable.Variable({}) # hash -> tx
231         mining_txs_var = variable.Variable({}) # hash -> tx
232         get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: bitcoind_work.value['previous_block'], net)
233         
234         best_share_var = variable.Variable(None)
235         desired_var = variable.Variable(None)
236         def set_best_share():
237             best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'], known_txs_var.value)
238             
239             best_share_var.set(best)
240             desired_var.set(desired)
241         bitcoind_work.changed.watch(lambda _: set_best_share())
242         set_best_share()
243         
244         print '    ...success!'
245         print
246         
247         # setup p2p logic and join p2pool network
248         
249         # update mining_txs according to getwork results
250         @bitcoind_work.changed.run_and_watch
251         def _(_=None):
252             new_mining_txs = {}
253             new_known_txs = dict(known_txs_var.value)
254             for tx_hash, tx in zip(bitcoind_work.value['transaction_hashes'], bitcoind_work.value['transactions']):
255                 new_mining_txs[tx_hash] = tx
256                 new_known_txs[tx_hash] = tx
257             mining_txs_var.set(new_mining_txs)
258             known_txs_var.set(new_known_txs)
259         # add p2p transactions from bitcoind to known_txs
260         @factory.new_tx.watch
261         def _(tx):
262             new_known_txs = dict(known_txs_var.value)
263             new_known_txs[bitcoin_data.hash256(bitcoin_data.tx_type.pack(tx))] = tx
264             known_txs_var.set(new_known_txs)
265         # forward transactions seen to bitcoind
266         @known_txs_var.transitioned.watch
267         def _(before, after):
268             for tx_hash in set(after) - set(before):
269                 factory.conn.value.send_tx(tx=after[tx_hash])
270         
271         class Node(p2p.Node):
272             def handle_shares(self, shares, peer):
273                 if len(shares) > 5:
274                     print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
275                 
276                 new_count = 0
277                 for share in shares:
278                     if share.hash in tracker.items:
279                         #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
280                         continue
281                     
282                     new_count += 1
283                     
284                     #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
285                     
286                     tracker.add(share)
287                 
288                 if new_count:
289                     set_best_share()
290                 
291                 if len(shares) > 5:
292                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.items), 2*net.CHAIN_LENGTH)
293             
294             @defer.inlineCallbacks
295             def handle_share_hashes(self, hashes, peer):
296                 new_hashes = [x for x in hashes if x not in tracker.items]
297                 if not new_hashes:
298                     return
299                 try:
300                     shares = yield peer.get_shares(
301                         hashes=new_hashes,
302                         parents=0,
303                         stops=[],
304                     )
305                 except:
306                     log.err(None, 'in handle_share_hashes:')
307                 else:
308                     self.handle_shares(shares, peer)
309             
310             def handle_get_shares(self, hashes, parents, stops, peer):
311                 parents = min(parents, 1000//len(hashes))
312                 stops = set(stops)
313                 shares = []
314                 for share_hash in hashes:
315                     for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
316                         if share.hash in stops:
317                             break
318                         shares.append(share)
319                 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
320                 return shares
321             
322             def handle_bestblock(self, header, peer):
323                 if net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header)) > header['bits'].target:
324                     raise p2p.PeerMisbehavingError('received block header fails PoW test')
325                 handle_header(header)
326         
327         @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
328         def submit_block_p2p(block):
329             if factory.conn.value is None:
330                 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, bitcoin_data.hash256(bitcoin_data.block_header_type.pack(block['header'])))
331                 raise deferral.RetrySilentlyException()
332             factory.conn.value.send_block(block=block)
333         
334         @deferral.retry('Error submitting block: (will retry)', 10, 10)
335         @defer.inlineCallbacks
336         def submit_block_rpc(block, ignore_failure):
337             if bitcoind_work.value['use_getblocktemplate']:
338                 result = yield bitcoind.rpc_submitblock(bitcoin_data.block_type.pack(block).encode('hex'))
339                 success = result is None
340             else:
341                 result = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
342                 success = result
343             success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
344             if (not success and success_expected and not ignore_failure) or (success and not success_expected):
345                 print >>sys.stderr, 'Block submittal result: %s (%r) Expected: %s' % (success, result, success_expected)
346         
347         def submit_block(block, ignore_failure):
348             submit_block_p2p(block)
349             submit_block_rpc(block, ignore_failure)
350         
351         @tracker.verified.added.watch
352         def _(share):
353             if share.pow_hash <= share.header['bits'].target:
354                 block = share.as_block(tracker, known_txs_var.value)
355                 if block is None:
356                     print >>sys.stderr, 'GOT INCOMPLETE BLOCK FROM PEER! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
357                     return
358                 submit_block(block, ignore_failure=True)
359                 print
360                 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
361                 print
362                 def spread():
363                     if (get_height_rel_highest(share.header['previous_block']) > -5 or
364                         bitcoind_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]):
365                         broadcast_share(share.hash)
366                 spread()
367                 reactor.callLater(5, spread) # so get_height_rel_highest can update
368         
369         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
370         
371         @defer.inlineCallbacks
372         def parse(x):
373             if ':' in x:
374                 ip, port = x.split(':')
375                 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
376             else:
377                 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
378         
379         addrs = {}
380         if os.path.exists(os.path.join(datadir_path, 'addrs')):
381             try:
382                 with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
383                     addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
384             except:
385                 print >>sys.stderr, 'error parsing addrs'
386         for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
387             try:
388                 addr = yield addr_df
389                 if addr not in addrs:
390                     addrs[addr] = (0, time.time(), time.time())
391             except:
392                 log.err()
393         
394         connect_addrs = set()
395         for addr_df in map(parse, args.p2pool_nodes):
396             try:
397                 connect_addrs.add((yield addr_df))
398             except:
399                 log.err()
400         
401         p2p_node = Node(
402             best_share_hash_func=lambda: best_share_var.value,
403             port=args.p2pool_port,
404             net=net,
405             addr_store=addrs,
406             connect_addrs=connect_addrs,
407             max_incoming_conns=args.p2pool_conns,
408             traffic_happened=traffic_happened,
409             known_txs_var=known_txs_var,
410             mining_txs_var=mining_txs_var,
411         )
412         p2p_node.start()
413         
414         def forget_old_txs():
415             new_known_txs = {}
416             for peer in p2p_node.peers.itervalues():
417                 new_known_txs.update(peer.remembered_txs)
418             new_known_txs.update(mining_txs_var.value)
419             for share in tracker.get_chain(best_share_var.value, min(120, tracker.get_height(best_share_var.value))):
420                 for tx_hash in share.new_transaction_hashes:
421                     if tx_hash in known_txs_var.value:
422                         new_known_txs[tx_hash] = known_txs_var.value[tx_hash]
423             known_txs_var.set(new_known_txs)
424         task.LoopingCall(forget_old_txs).start(10)
425         
426         def save_addrs():
427             with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
428                 f.write(json.dumps(p2p_node.addr_store.items()))
429         task.LoopingCall(save_addrs).start(60)
430         
431         @best_block_header.changed.watch
432         def _(header):
433             for peer in p2p_node.peers.itervalues():
434                 peer.send_bestblock(header=header)
435         
436         @defer.inlineCallbacks
437         def broadcast_share(share_hash):
438             shares = []
439             for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
440                 if share.hash in shared_share_hashes:
441                     break
442                 shared_share_hashes.add(share.hash)
443                 shares.append(share)
444             
445             for peer in list(p2p_node.peers.itervalues()):
446                 yield peer.sendShares([share for share in shares if share.peer is not peer], tracker, known_txs_var.value, include_txs_with=[share_hash])
447         
448         # send share when the chain changes to their chain
449         best_share_var.changed.watch(broadcast_share)
450         
451         def save_shares():
452             for share in tracker.get_chain(best_share_var.value, min(tracker.get_height(best_share_var.value), 2*net.CHAIN_LENGTH)):
453                 ss.add_share(share)
454                 if share.hash in tracker.verified.items:
455                     ss.add_verified_hash(share.hash)
456         task.LoopingCall(save_shares).start(60)
457         
458         @apply
459         @defer.inlineCallbacks
460         def download_shares():
461             while True:
462                 desired = yield desired_var.get_when_satisfies(lambda val: len(val) != 0)
463                 peer2, share_hash = random.choice(desired)
464                 
465                 if len(p2p_node.peers) == 0:
466                     yield deferral.sleep(1)
467                     continue
468                 peer = random.choice(p2p_node.peers.values())
469                 
470                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
471                 try:
472                     shares = yield peer.get_shares(
473                         hashes=[share_hash],
474                         parents=500,
475                         stops=[],
476                     )
477                 except:
478                     log.err(None, 'in download_shares:')
479                     continue
480                 
481                 if not shares:
482                     yield deferral.sleep(1) # sleep so we don't keep rerequesting the same share nobody has
483                     continue
484                 p2p_node.handle_shares(shares, peer)
485         
486         print '    ...success!'
487         print
488         
489         if args.upnp:
490             @defer.inlineCallbacks
491             def upnp_thread():
492                 while True:
493                     try:
494                         is_lan, lan_ip = yield ipdiscover.get_local_ip()
495                         if is_lan:
496                             pm = yield portmapper.get_port_mapper()
497                             yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
498                     except defer.TimeoutError:
499                         pass
500                     except:
501                         if p2pool.DEBUG:
502                             log.err(None, 'UPnP error:')
503                     yield deferral.sleep(random.expovariate(1/120))
504             upnp_thread()
505         
506         # start listening for workers with a JSON-RPC server
507         
508         print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
509         
510         get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, best_share_var.value, bitcoind_work.value['bits'].target, bitcoind_work.value['subsidy'], net)
511         
512         wb = work.WorkerBridge(my_pubkey_hash, net, args.donation_percentage, bitcoind_work, best_block_header, merged_urls, best_share_var, tracker, my_share_hashes, my_doa_share_hashes, args.worker_fee, p2p_node, submit_block, set_best_share, broadcast_share, block_height_var)
513         web_root = web.get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net, wb.get_stale_counts, my_pubkey_hash, wb.local_rate_monitor, args.worker_fee, p2p_node, wb.my_share_hashes, wb.pseudoshare_received, wb.share_received, best_share_var, bitcoind_warning_var, traffic_happened)
514         worker_interface.WorkerInterface(wb).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
515         
516         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
517         
518         with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
519             pass
520         
521         print '    ...success!'
522         print
523         
524         
525         # done!
526         print 'Started successfully!'
527         print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
528         if args.donation_percentage > 0.51:
529             print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
530         elif args.donation_percentage < 0.49:
531             print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
532         else:
533             print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
534             print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
535         print
536         
537         
538         if hasattr(signal, 'SIGALRM'):
539             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
540                 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
541             ))
542             signal.siginterrupt(signal.SIGALRM, False)
543             task.LoopingCall(signal.alarm, 30).start(1)
544         
545         if args.irc_announce:
546             from twisted.words.protocols import irc
547             class IRCClient(irc.IRCClient):
548                 nickname = 'p2pool%02i' % (random.randrange(100),)
549                 channel = net.ANNOUNCE_CHANNEL
550                 def lineReceived(self, line):
551                     if p2pool.DEBUG:
552                         print repr(line)
553                     irc.IRCClient.lineReceived(self, line)
554                 def signedOn(self):
555                     irc.IRCClient.signedOn(self)
556                     self.factory.resetDelay()
557                     self.join(self.channel)
558                     @defer.inlineCallbacks
559                     def new_share(share):
560                         if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
561                             yield deferral.sleep(random.expovariate(1/60))
562                             message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
563                             if all('%x' % (share.header_hash,) not in old_message for old_message in self.recent_messages):
564                                 self.say(self.channel, message)
565                                 self._remember_message(message)
566                     self.watch_id = tracker.verified.added.watch(new_share)
567                     self.recent_messages = []
568                 def _remember_message(self, message):
569                     self.recent_messages.append(message)
570                     while len(self.recent_messages) > 100:
571                         self.recent_messages.pop(0)
572                 def privmsg(self, user, channel, message):
573                     if channel == self.channel:
574                         self._remember_message(message)
575                 def connectionLost(self, reason):
576                     tracker.verified.added.unwatch(self.watch_id)
577                     print 'IRC connection lost:', reason.getErrorMessage()
578             class IRCClientFactory(protocol.ReconnectingClientFactory):
579                 protocol = IRCClient
580             reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
581         
582         @defer.inlineCallbacks
583         def status_thread():
584             last_str = None
585             last_time = 0
586             while True:
587                 yield deferral.sleep(3)
588                 try:
589                     height = tracker.get_height(best_share_var.value)
590                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
591                         height,
592                         len(tracker.verified.items),
593                         len(tracker.items),
594                         len(p2p_node.peers),
595                         sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
596                     ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
597                     
598                     datums, dt = wb.local_rate_monitor.get_datums_in_last()
599                     my_att_s = sum(datum['work']/dt for datum in datums)
600                     this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
601                         math.format(int(my_att_s)),
602                         math.format_dt(dt),
603                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
604                         math.format_dt(2**256 / tracker.items[best_share_var.value].max_target / my_att_s) if my_att_s and best_share_var.value else '???',
605                     )
606                     
607                     if height > 2:
608                         (stale_orphan_shares, stale_doa_shares), shares, _ = wb.get_stale_counts()
609                         stale_prop = p2pool_data.get_average_stale_prop(tracker, best_share_var.value, min(60*60//net.SHARE_PERIOD, height))
610                         real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, best_share_var.value, min(height - 1, 60*60//net.SHARE_PERIOD)) / (1 - stale_prop)
611                         
612                         this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
613                             shares, stale_orphan_shares, stale_doa_shares,
614                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
615                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
616                             get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
617                         )
618                         this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
619                             math.format(int(real_att_s)),
620                             100*stale_prop,
621                             math.format_dt(2**256 / bitcoind_work.value['bits'].target / real_att_s),
622                         )
623                         
624                         for warning in p2pool_data.get_warnings(tracker, best_share_var.value, net, bitcoind_warning_var.value, bitcoind_work.value):
625                             print >>sys.stderr, '#'*40
626                             print >>sys.stderr, '>>> Warning: ' + warning
627                             print >>sys.stderr, '#'*40
628                     
629                     if this_str != last_str or time.time() > last_time + 15:
630                         print this_str
631                         last_str = this_str
632                         last_time = time.time()
633                 except:
634                     log.err()
635         status_thread()
636     except:
637         reactor.stop()
638         log.err(None, 'Fatal error:')
639
640 def run():
641     realnets = dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
642     
643     parser = fixargparse.FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
644     parser.add_argument('--version', action='version', version=p2pool.__version__)
645     parser.add_argument('--net',
646         help='use specified network (default: bitcoin)',
647         action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
648     parser.add_argument('--testnet',
649         help='''use the network's testnet''',
650         action='store_const', const=True, default=False, dest='testnet')
651     parser.add_argument('--debug',
652         help='enable debugging mode',
653         action='store_const', const=True, default=False, dest='debug')
654     parser.add_argument('-a', '--address',
655         help='generate payouts to this address (default: <address requested from bitcoind>)',
656         type=str, action='store', default=None, dest='address')
657     parser.add_argument('--datadir',
658         help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
659         type=str, action='store', default=None, dest='datadir')
660     parser.add_argument('--logfile',
661         help='''log to this file (default: data/<NET>/log)''',
662         type=str, action='store', default=None, dest='logfile')
663     parser.add_argument('--merged',
664         help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
665         type=str, action='append', default=[], dest='merged_urls')
666     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
667         help='donate this percentage of work towards the development of p2pool (default: 0.5)',
668         type=float, action='store', default=0.5, dest='donation_percentage')
669     parser.add_argument('--iocp',
670         help='use Windows IOCP API in order to avoid errors due to large number of sockets being open',
671         action='store_true', default=False, dest='iocp')
672     parser.add_argument('--irc-announce',
673         help='announce any blocks found on irc://irc.freenode.net/#p2pool',
674         action='store_true', default=False, dest='irc_announce')
675     parser.add_argument('--no-bugreport',
676         help='disable submitting caught exceptions to the author',
677         action='store_true', default=False, dest='no_bugreport')
678     
679     p2pool_group = parser.add_argument_group('p2pool interface')
680     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
681         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
682         type=int, action='store', default=None, dest='p2pool_port')
683     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
684         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
685         type=str, action='append', default=[], dest='p2pool_nodes')
686     parser.add_argument('--disable-upnp',
687         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
688         action='store_false', default=True, dest='upnp')
689     p2pool_group.add_argument('--max-conns', metavar='CONNS',
690         help='maximum incoming connections (default: 40)',
691         type=int, action='store', default=40, dest='p2pool_conns')
692     
693     worker_group = parser.add_argument_group('worker interface')
694     worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
695         help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
696         type=str, action='store', default=None, dest='worker_endpoint')
697     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
698         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
699         type=float, action='store', default=0, dest='worker_fee')
700     
701     bitcoind_group = parser.add_argument_group('bitcoind interface')
702     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
703         help='connect to this address (default: 127.0.0.1)',
704         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
705     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
706         help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
707         type=int, action='store', default=None, dest='bitcoind_rpc_port')
708     bitcoind_group.add_argument('--bitcoind-rpc-ssl',
709         help='connect to JSON-RPC interface using SSL',
710         action='store_true', default=False, dest='bitcoind_rpc_ssl')
711     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
712         help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
713         type=int, action='store', default=None, dest='bitcoind_p2p_port')
714     
715     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
716         help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
717         type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
718     
719     args = parser.parse_args()
720     
721     if args.debug:
722         p2pool.DEBUG = True
723         defer.setDebugging(True)
724     
725     net_name = args.net_name + ('_testnet' if args.testnet else '')
726     net = networks.nets[net_name]
727     
728     datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
729     if not os.path.exists(datadir_path):
730         os.makedirs(datadir_path)
731     
732     if len(args.bitcoind_rpc_userpass) > 2:
733         parser.error('a maximum of two arguments are allowed')
734     args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
735     
736     if args.bitcoind_rpc_password is None:
737         conf_path = net.PARENT.CONF_FILE_FUNC()
738         if not os.path.exists(conf_path):
739             parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
740                 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
741                 '''\r\n'''
742                 '''server=1\r\n'''
743                 '''rpcpassword=%x\r\n'''
744                 '''\r\n'''
745                 '''Keep that password secret! After creating the file, restart Bitcoin.''' % (conf_path, random.randrange(2**128)))
746         conf = open(conf_path, 'rb').read()
747         contents = {}
748         for line in conf.splitlines(True):
749             if '#' in line:
750                 line = line[:line.index('#')]
751             if '=' not in line:
752                 continue
753             k, v = line.split('=', 1)
754             contents[k.strip()] = v.strip()
755         for conf_name, var_name, var_type in [
756             ('rpcuser', 'bitcoind_rpc_username', str),
757             ('rpcpassword', 'bitcoind_rpc_password', str),
758             ('rpcport', 'bitcoind_rpc_port', int),
759             ('port', 'bitcoind_p2p_port', int),
760         ]:
761             if getattr(args, var_name) is None and conf_name in contents:
762                 setattr(args, var_name, var_type(contents[conf_name]))
763         if args.bitcoind_rpc_password is None:
764             parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
765     
766     if args.bitcoind_rpc_username is None:
767         args.bitcoind_rpc_username = ''
768     
769     if args.bitcoind_rpc_port is None:
770         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
771     
772     if args.bitcoind_p2p_port is None:
773         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
774     
775     if args.p2pool_port is None:
776         args.p2pool_port = net.P2P_PORT
777     
778     if args.worker_endpoint is None:
779         worker_endpoint = '', net.WORKER_PORT
780     elif ':' not in args.worker_endpoint:
781         worker_endpoint = '', int(args.worker_endpoint)
782     else:
783         addr, port = args.worker_endpoint.rsplit(':', 1)
784         worker_endpoint = addr, int(port)
785     
786     if args.address is not None:
787         try:
788             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
789         except Exception, e:
790             parser.error('error parsing address: ' + repr(e))
791     else:
792         args.pubkey_hash = None
793     
794     def separate_url(url):
795         s = urlparse.urlsplit(url)
796         if '@' not in s.netloc:
797             parser.error('merged url netloc must contain an "@"')
798         userpass, new_netloc = s.netloc.rsplit('@', 1)
799         return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
800     merged_urls = map(separate_url, args.merged_urls)
801     
802     if args.logfile is None:
803         args.logfile = os.path.join(datadir_path, 'log')
804     
805     logfile = logging.LogFile(args.logfile)
806     pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
807     sys.stdout = logging.AbortPipe(pipe)
808     sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
809     if hasattr(signal, "SIGUSR1"):
810         def sigusr1(signum, frame):
811             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
812             logfile.reopen()
813             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
814         signal.signal(signal.SIGUSR1, sigusr1)
815     task.LoopingCall(logfile.reopen).start(5)
816     
817     class ErrorReporter(object):
818         def __init__(self):
819             self.last_sent = None
820         
821         def emit(self, eventDict):
822             if not eventDict["isError"]:
823                 return
824             
825             if self.last_sent is not None and time.time() < self.last_sent + 5:
826                 return
827             self.last_sent = time.time()
828             
829             if 'failure' in eventDict:
830                 text = ((eventDict.get('why') or 'Unhandled Error')
831                     + '\n' + eventDict['failure'].getTraceback())
832             else:
833                 text = " ".join([str(m) for m in eventDict["message"]]) + "\n"
834             
835             from twisted.web import client
836             client.getPage(
837                 url='http://u.forre.st/p2pool_error.cgi',
838                 method='POST',
839                 postdata=p2pool.__version__ + ' ' + net.NAME + '\n' + text,
840                 timeout=15,
841             ).addBoth(lambda x: None)
842     if not args.no_bugreport:
843         log.addObserver(ErrorReporter().emit)
844     
845     reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)
846     reactor.run()