fixed ordering of "Testing bitcoind XXX connection" messages during startup
[p2pool.git] / p2pool / main.py
1 from __future__ import division
2
3 import base64
4 import json
5 import os
6 import random
7 import sys
8 import time
9 import signal
10 import traceback
11 import urlparse
12
13 if '--iocp' in sys.argv:
14     from twisted.internet import iocpreactor
15     iocpreactor.install()
16 from twisted.internet import defer, reactor, protocol, task
17 from twisted.web import server
18 from twisted.python import log
19 from nattraverso import portmapper, ipdiscover
20
21 import bitcoin.p2p as bitcoin_p2p, bitcoin.data as bitcoin_data
22 from bitcoin import worker_interface, height_tracker
23 from util import fixargparse, jsonrpc, variable, deferral, math, logging
24 from . import p2p, networks, web, work
25 import p2pool, p2pool.data as p2pool_data
26
27 @deferral.retry('Error getting work from bitcoind:', 3)
28 @defer.inlineCallbacks
29 def getwork(bitcoind, use_getblocktemplate=False):
30     def go():
31         if use_getblocktemplate:
32             return bitcoind.rpc_getblocktemplate(dict(mode='template'))
33         else:
34             return bitcoind.rpc_getmemorypool()
35     try:
36         work = yield go()
37     except jsonrpc.Error_for_code(-32601): # Method not found
38         use_getblocktemplate = not use_getblocktemplate
39         try:
40             work = yield go()
41         except jsonrpc.Error_for_code(-32601): # Method not found
42             print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
43             raise deferral.RetrySilentlyException()
44     packed_transactions = [(x['data'] if isinstance(x, dict) else x).decode('hex') for x in work['transactions']]
45     if 'height' not in work:
46         work['height'] = (yield bitcoind.rpc_getblock(work['previousblockhash']))['height'] + 1
47     elif p2pool.DEBUG:
48         assert work['height'] == (yield bitcoind.rpc_getblock(work['previousblockhash']))['height'] + 1
49     defer.returnValue(dict(
50         version=work['version'],
51         previous_block=int(work['previousblockhash'], 16),
52         transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
53         transaction_hashes=map(bitcoin_data.hash256, packed_transactions),
54         subsidy=work['coinbasevalue'],
55         time=work['time'] if 'time' in work else work['curtime'],
56         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
57         coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
58         height=work['height'],
59         last_update=time.time(),
60         use_getblocktemplate=use_getblocktemplate,
61     ))
62
63 @defer.inlineCallbacks
64 def main(args, net, datadir_path, merged_urls, worker_endpoint):
65     try:
66         print 'p2pool (version %s)' % (p2pool.__version__,)
67         print
68         
69         traffic_happened = variable.Event()
70         
71         @defer.inlineCallbacks
72         def connect_p2p():
73             # connect to bitcoind over bitcoin-p2p
74             print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
75             factory = bitcoin_p2p.ClientFactory(net.PARENT)
76             reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
77             yield factory.getProtocol() # waits until handshake is successful
78             print '    ...success!'
79             print
80             defer.returnValue(factory)
81         
82         if args.testnet: # establish p2p connection first if testnet so bitcoind can work without connections
83             factory = yield connect_p2p()
84         
85         # connect to bitcoind over JSON-RPC and do initial getmemorypool
86         url = '%s://%s:%i/' % ('https' if args.bitcoind_rpc_ssl else 'http', args.bitcoind_address, args.bitcoind_rpc_port)
87         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
88         bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
89         @deferral.retry('Error while checking Bitcoin connection:', 1)
90         @defer.inlineCallbacks
91         def check():
92             if not (yield net.PARENT.RPC_CHECK(bitcoind)):
93                 print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
94                 raise deferral.RetrySilentlyException()
95             if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version']):
96                 print >>sys.stderr, '    Bitcoin version too old! Upgrade to 0.6.4 or newer!'
97                 raise deferral.RetrySilentlyException()
98         yield check()
99         temp_work = yield getwork(bitcoind)
100         
101         bitcoind_warning_var = variable.Variable(None)
102         @defer.inlineCallbacks
103         def poll_warnings():
104             errors = (yield deferral.retry('Error while calling getmininginfo:')(bitcoind.rpc_getmininginfo)())['errors']
105             bitcoind_warning_var.set(errors if errors != '' else None)
106         yield poll_warnings()
107         task.LoopingCall(poll_warnings).start(20*60)
108         
109         print '    ...success!'
110         print '    Current block hash: %x' % (temp_work['previous_block'],)
111         print '    Current block height: %i' % (temp_work['height'] - 1,)
112         print
113         
114         if not args.testnet:
115             factory = yield connect_p2p()
116         
117         print 'Determining payout address...'
118         if args.pubkey_hash is None:
119             address_path = os.path.join(datadir_path, 'cached_payout_address')
120             
121             if os.path.exists(address_path):
122                 with open(address_path, 'rb') as f:
123                     address = f.read().strip('\r\n')
124                 print '    Loaded cached address: %s...' % (address,)
125             else:
126                 address = None
127             
128             if address is not None:
129                 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
130                 if not res['isvalid'] or not res['ismine']:
131                     print '    Cached address is either invalid or not controlled by local bitcoind!'
132                     address = None
133             
134             if address is None:
135                 print '    Getting payout address from bitcoind...'
136                 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
137             
138             with open(address_path, 'wb') as f:
139                 f.write(address)
140             
141             my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
142         else:
143             my_pubkey_hash = args.pubkey_hash
144         print '    ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
145         print
146         
147         my_share_hashes = set()
148         my_doa_share_hashes = set()
149         
150         tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
151         shared_share_hashes = set()
152         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
153         known_verified = set()
154         print "Loading shares..."
155         for i, (mode, contents) in enumerate(ss.get_shares()):
156             if mode == 'share':
157                 if contents.hash in tracker.items:
158                     continue
159                 shared_share_hashes.add(contents.hash)
160                 contents.time_seen = 0
161                 tracker.add(contents)
162                 if len(tracker.items) % 1000 == 0 and tracker.items:
163                     print "    %i" % (len(tracker.items),)
164             elif mode == 'verified_hash':
165                 known_verified.add(contents)
166             else:
167                 raise AssertionError()
168         print "    ...inserting %i verified shares..." % (len(known_verified),)
169         for h in known_verified:
170             if h not in tracker.items:
171                 ss.forget_verified_share(h)
172                 continue
173             tracker.verified.add(tracker.items[h])
174         print "    ...done loading %i shares!" % (len(tracker.items),)
175         print
176         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
177         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
178         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
179         
180         print 'Initializing work...'
181         
182         
183         # BITCOIND WORK
184         
185         bitcoind_work = variable.Variable((yield getwork(bitcoind)))
186         @defer.inlineCallbacks
187         def work_poller():
188             while True:
189                 flag = factory.new_block.get_deferred()
190                 try:
191                     bitcoind_work.set((yield getwork(bitcoind, bitcoind_work.value['use_getblocktemplate'])))
192                 except:
193                     log.err()
194                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
195         work_poller()
196         
197         # PEER WORK
198         
199         best_block_header = variable.Variable(None)
200         def handle_header(new_header):
201             # check that header matches current target
202             if not (net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(new_header)) <= bitcoind_work.value['bits'].target):
203                 return
204             bitcoind_best_block = bitcoind_work.value['previous_block']
205             if (best_block_header.value is None
206                 or (
207                     new_header['previous_block'] == bitcoind_best_block and
208                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)) == bitcoind_best_block
209                 ) # new is child of current and previous is current
210                 or (
211                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(new_header)) == bitcoind_best_block and
212                     best_block_header.value['previous_block'] != bitcoind_best_block
213                 )): # new is current and previous is not a child of current
214                 best_block_header.set(new_header)
215         @defer.inlineCallbacks
216         def poll_header():
217             handle_header((yield factory.conn.value.get_block_header(bitcoind_work.value['previous_block'])))
218         bitcoind_work.changed.watch(lambda _: poll_header())
219         yield deferral.retry('Error while requesting best block header:')(poll_header)()
220         
221         # BEST SHARE
222         
223         known_txs_var = variable.Variable({}) # hash -> tx
224         mining_txs_var = variable.Variable({}) # hash -> tx
225         get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: bitcoind_work.value['previous_block'], net)
226         
227         best_share_var = variable.Variable(None)
228         desired_var = variable.Variable(None)
229         def set_best_share():
230             best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'], known_txs_var.value)
231             
232             best_share_var.set(best)
233             desired_var.set(desired)
234         bitcoind_work.changed.watch(lambda _: set_best_share())
235         set_best_share()
236         
237         print '    ...success!'
238         print
239         
240         # setup p2p logic and join p2pool network
241         
242         # update mining_txs according to getwork results
243         @bitcoind_work.changed.run_and_watch
244         def _(_=None):
245             new_mining_txs = {}
246             new_known_txs = dict(known_txs_var.value)
247             for tx_hash, tx in zip(bitcoind_work.value['transaction_hashes'], bitcoind_work.value['transactions']):
248                 new_mining_txs[tx_hash] = tx
249                 new_known_txs[tx_hash] = tx
250             mining_txs_var.set(new_mining_txs)
251             known_txs_var.set(new_known_txs)
252         # add p2p transactions from bitcoind to known_txs
253         @factory.new_tx.watch
254         def _(tx):
255             new_known_txs = dict(known_txs_var.value)
256             new_known_txs[bitcoin_data.hash256(bitcoin_data.tx_type.pack(tx))] = tx
257             known_txs_var.set(new_known_txs)
258         # forward transactions seen to bitcoind
259         @known_txs_var.transitioned.watch
260         @defer.inlineCallbacks
261         def _(before, after):
262             yield deferral.sleep(random.expovariate(1/1))
263             for tx_hash in set(after) - set(before):
264                 factory.conn.value.send_tx(tx=after[tx_hash])
265         
266         class Node(p2p.Node):
267             def handle_shares(self, shares, peer):
268                 if len(shares) > 5:
269                     print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
270                 
271                 new_count = 0
272                 for share in shares:
273                     if share.hash in tracker.items:
274                         #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
275                         continue
276                     
277                     new_count += 1
278                     
279                     #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
280                     
281                     tracker.add(share)
282                 
283                 if new_count:
284                     set_best_share()
285                 
286                 if len(shares) > 5:
287                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.items), 2*net.CHAIN_LENGTH)
288             
289             @defer.inlineCallbacks
290             def handle_share_hashes(self, hashes, peer):
291                 new_hashes = [x for x in hashes if x not in tracker.items]
292                 if not new_hashes:
293                     return
294                 try:
295                     shares = yield peer.get_shares(
296                         hashes=new_hashes,
297                         parents=0,
298                         stops=[],
299                     )
300                 except:
301                     log.err(None, 'in handle_share_hashes:')
302                 else:
303                     self.handle_shares(shares, peer)
304             
305             def handle_get_shares(self, hashes, parents, stops, peer):
306                 parents = min(parents, 1000//len(hashes))
307                 stops = set(stops)
308                 shares = []
309                 for share_hash in hashes:
310                     for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
311                         if share.hash in stops:
312                             break
313                         shares.append(share)
314                 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
315                 return shares
316             
317             def handle_bestblock(self, header, peer):
318                 if net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header)) > header['bits'].target:
319                     raise p2p.PeerMisbehavingError('received block header fails PoW test')
320                 handle_header(header)
321         
322         @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
323         def submit_block_p2p(block):
324             if factory.conn.value is None:
325                 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, bitcoin_data.hash256(bitcoin_data.block_header_type.pack(block['header'])))
326                 raise deferral.RetrySilentlyException()
327             factory.conn.value.send_block(block=block)
328         
329         @deferral.retry('Error submitting block: (will retry)', 10, 10)
330         @defer.inlineCallbacks
331         def submit_block_rpc(block, ignore_failure):
332             if bitcoind_work.value['use_getblocktemplate']:
333                 result = yield bitcoind.rpc_submitblock(bitcoin_data.block_type.pack(block).encode('hex'))
334                 success = result is None
335             else:
336                 result = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
337                 success = result
338             success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
339             if (not success and success_expected and not ignore_failure) or (success and not success_expected):
340                 print >>sys.stderr, 'Block submittal result: %s (%r) Expected: %s' % (success, result, success_expected)
341         
342         def submit_block(block, ignore_failure):
343             submit_block_p2p(block)
344             submit_block_rpc(block, ignore_failure)
345         
346         @tracker.verified.added.watch
347         def _(share):
348             if share.pow_hash <= share.header['bits'].target:
349                 block = share.as_block(tracker, known_txs_var.value)
350                 if block is None:
351                     print >>sys.stderr, 'GOT INCOMPLETE BLOCK FROM PEER! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
352                     return
353                 submit_block(block, ignore_failure=True)
354                 print
355                 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
356                 print
357                 def spread():
358                     if (get_height_rel_highest(share.header['previous_block']) > -5 or
359                         bitcoind_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]):
360                         broadcast_share(share.hash)
361                 spread()
362                 reactor.callLater(5, spread) # so get_height_rel_highest can update
363         
364         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
365         
366         @defer.inlineCallbacks
367         def parse(x):
368             if ':' in x:
369                 ip, port = x.split(':')
370                 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
371             else:
372                 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
373         
374         addrs = {}
375         if os.path.exists(os.path.join(datadir_path, 'addrs')):
376             try:
377                 with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
378                     addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
379             except:
380                 print >>sys.stderr, 'error parsing addrs'
381         for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
382             try:
383                 addr = yield addr_df
384                 if addr not in addrs:
385                     addrs[addr] = (0, time.time(), time.time())
386             except:
387                 log.err()
388         
389         connect_addrs = set()
390         for addr_df in map(parse, args.p2pool_nodes):
391             try:
392                 connect_addrs.add((yield addr_df))
393             except:
394                 log.err()
395         
396         p2p_node = Node(
397             best_share_hash_func=lambda: best_share_var.value,
398             port=args.p2pool_port,
399             net=net,
400             addr_store=addrs,
401             connect_addrs=connect_addrs,
402             max_incoming_conns=args.p2pool_conns,
403             traffic_happened=traffic_happened,
404             known_txs_var=known_txs_var,
405             mining_txs_var=mining_txs_var,
406         )
407         p2p_node.start()
408         
409         def forget_old_txs():
410             new_known_txs = {}
411             for peer in p2p_node.peers.itervalues():
412                 new_known_txs.update(peer.remembered_txs)
413             new_known_txs.update(mining_txs_var.value)
414             for share in tracker.get_chain(best_share_var.value, min(120, tracker.get_height(best_share_var.value))):
415                 for tx_hash in share.new_transaction_hashes:
416                     if tx_hash in known_txs_var.value:
417                         new_known_txs[tx_hash] = known_txs_var.value[tx_hash]
418             known_txs_var.set(new_known_txs)
419         task.LoopingCall(forget_old_txs).start(10)
420         
421         def save_addrs():
422             with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
423                 f.write(json.dumps(p2p_node.addr_store.items()))
424         task.LoopingCall(save_addrs).start(60)
425         
426         @best_block_header.changed.watch
427         def _(header):
428             for peer in p2p_node.peers.itervalues():
429                 peer.send_bestblock(header=header)
430         
431         @defer.inlineCallbacks
432         def broadcast_share(share_hash):
433             shares = []
434             for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
435                 if share.hash in shared_share_hashes:
436                     break
437                 shared_share_hashes.add(share.hash)
438                 shares.append(share)
439             
440             for peer in list(p2p_node.peers.itervalues()):
441                 yield peer.sendShares([share for share in shares if share.peer is not peer], tracker, known_txs_var.value, include_txs_with=[share_hash])
442         
443         # send share when the chain changes to their chain
444         best_share_var.changed.watch(broadcast_share)
445         
446         def save_shares():
447             for share in tracker.get_chain(best_share_var.value, min(tracker.get_height(best_share_var.value), 2*net.CHAIN_LENGTH)):
448                 ss.add_share(share)
449                 if share.hash in tracker.verified.items:
450                     ss.add_verified_hash(share.hash)
451         task.LoopingCall(save_shares).start(60)
452         
453         @apply
454         @defer.inlineCallbacks
455         def download_shares():
456             while True:
457                 desired = yield desired_var.get_when_satisfies(lambda val: len(val) != 0)
458                 peer2, share_hash = random.choice(desired)
459                 
460                 if len(p2p_node.peers) == 0:
461                     yield deferral.sleep(1)
462                     continue
463                 peer = random.choice(p2p_node.peers.values())
464                 
465                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
466                 try:
467                     shares = yield peer.get_shares(
468                         hashes=[share_hash],
469                         parents=500,
470                         stops=[],
471                     )
472                 except:
473                     log.err(None, 'in download_shares:')
474                     continue
475                 
476                 if not shares:
477                     yield deferral.sleep(1) # sleep so we don't keep rerequesting the same share nobody has
478                     continue
479                 p2p_node.handle_shares(shares, peer)
480         
481         print '    ...success!'
482         print
483         
484         if args.upnp:
485             @defer.inlineCallbacks
486             def upnp_thread():
487                 while True:
488                     try:
489                         is_lan, lan_ip = yield ipdiscover.get_local_ip()
490                         if is_lan:
491                             pm = yield portmapper.get_port_mapper()
492                             yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
493                     except defer.TimeoutError:
494                         pass
495                     except:
496                         if p2pool.DEBUG:
497                             log.err(None, 'UPnP error:')
498                     yield deferral.sleep(random.expovariate(1/120))
499             upnp_thread()
500         
501         # start listening for workers with a JSON-RPC server
502         
503         print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
504         
505         get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, best_share_var.value, bitcoind_work.value['bits'].target, bitcoind_work.value['subsidy'], net)
506         
507         wb = work.WorkerBridge(my_pubkey_hash, net, args.donation_percentage, bitcoind_work, best_block_header, merged_urls, best_share_var, tracker, my_share_hashes, my_doa_share_hashes, args.worker_fee, p2p_node, submit_block, set_best_share, broadcast_share)
508         web_root = web.get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net, wb.get_stale_counts, my_pubkey_hash, wb.local_rate_monitor, args.worker_fee, p2p_node, wb.my_share_hashes, wb.pseudoshare_received, wb.share_received, best_share_var, bitcoind_warning_var, traffic_happened, args.donation_percentage)
509         worker_interface.WorkerInterface(wb).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
510         
511         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
512         
513         with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
514             pass
515         
516         print '    ...success!'
517         print
518         
519         
520         # done!
521         print 'Started successfully!'
522         print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
523         if args.donation_percentage > 0.51:
524             print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
525         elif args.donation_percentage < 0.49:
526             print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
527         else:
528             print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
529             print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
530         print
531         
532         
533         if hasattr(signal, 'SIGALRM'):
534             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
535                 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
536             ))
537             signal.siginterrupt(signal.SIGALRM, False)
538             task.LoopingCall(signal.alarm, 30).start(1)
539         
540         if args.irc_announce:
541             from twisted.words.protocols import irc
542             class IRCClient(irc.IRCClient):
543                 nickname = 'p2pool%02i' % (random.randrange(100),)
544                 channel = net.ANNOUNCE_CHANNEL
545                 def lineReceived(self, line):
546                     if p2pool.DEBUG:
547                         print repr(line)
548                     irc.IRCClient.lineReceived(self, line)
549                 def signedOn(self):
550                     self.in_channel = False
551                     irc.IRCClient.signedOn(self)
552                     self.factory.resetDelay()
553                     self.join(self.channel)
554                     @defer.inlineCallbacks
555                     def new_share(share):
556                         if not self.in_channel:
557                             return
558                         if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
559                             yield deferral.sleep(random.expovariate(1/60))
560                             message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
561                             if all('%x' % (share.header_hash,) not in old_message for old_message in self.recent_messages):
562                                 self.say(self.channel, message)
563                                 self._remember_message(message)
564                     self.watch_id = tracker.verified.added.watch(new_share)
565                     self.recent_messages = []
566                 def joined(self, channel):
567                     self.in_channel = True
568                 def left(self, channel):
569                     self.in_channel = False
570                 def _remember_message(self, message):
571                     self.recent_messages.append(message)
572                     while len(self.recent_messages) > 100:
573                         self.recent_messages.pop(0)
574                 def privmsg(self, user, channel, message):
575                     if channel == self.channel:
576                         self._remember_message(message)
577                 def connectionLost(self, reason):
578                     tracker.verified.added.unwatch(self.watch_id)
579                     print 'IRC connection lost:', reason.getErrorMessage()
580             class IRCClientFactory(protocol.ReconnectingClientFactory):
581                 protocol = IRCClient
582             reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
583         
584         @defer.inlineCallbacks
585         def status_thread():
586             last_str = None
587             last_time = 0
588             while True:
589                 yield deferral.sleep(3)
590                 try:
591                     height = tracker.get_height(best_share_var.value)
592                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
593                         height,
594                         len(tracker.verified.items),
595                         len(tracker.items),
596                         len(p2p_node.peers),
597                         sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
598                     ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
599                     
600                     datums, dt = wb.local_rate_monitor.get_datums_in_last()
601                     my_att_s = sum(datum['work']/dt for datum in datums)
602                     this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
603                         math.format(int(my_att_s)),
604                         math.format_dt(dt),
605                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
606                         math.format_dt(2**256 / tracker.items[best_share_var.value].max_target / my_att_s) if my_att_s and best_share_var.value else '???',
607                     )
608                     
609                     if height > 2:
610                         (stale_orphan_shares, stale_doa_shares), shares, _ = wb.get_stale_counts()
611                         stale_prop = p2pool_data.get_average_stale_prop(tracker, best_share_var.value, min(60*60//net.SHARE_PERIOD, height))
612                         real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, best_share_var.value, min(height - 1, 60*60//net.SHARE_PERIOD)) / (1 - stale_prop)
613                         
614                         this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
615                             shares, stale_orphan_shares, stale_doa_shares,
616                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
617                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
618                             get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
619                         )
620                         this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
621                             math.format(int(real_att_s)),
622                             100*stale_prop,
623                             math.format_dt(2**256 / bitcoind_work.value['bits'].target / real_att_s),
624                         )
625                         
626                         for warning in p2pool_data.get_warnings(tracker, best_share_var.value, net, bitcoind_warning_var.value, bitcoind_work.value):
627                             print >>sys.stderr, '#'*40
628                             print >>sys.stderr, '>>> Warning: ' + warning
629                             print >>sys.stderr, '#'*40
630                     
631                     if this_str != last_str or time.time() > last_time + 15:
632                         print this_str
633                         last_str = this_str
634                         last_time = time.time()
635                 except:
636                     log.err()
637         status_thread()
638     except:
639         reactor.stop()
640         log.err(None, 'Fatal error:')
641
642 def run():
643     realnets = dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
644     
645     parser = fixargparse.FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
646     parser.add_argument('--version', action='version', version=p2pool.__version__)
647     parser.add_argument('--net',
648         help='use specified network (default: bitcoin)',
649         action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
650     parser.add_argument('--testnet',
651         help='''use the network's testnet''',
652         action='store_const', const=True, default=False, dest='testnet')
653     parser.add_argument('--debug',
654         help='enable debugging mode',
655         action='store_const', const=True, default=False, dest='debug')
656     parser.add_argument('-a', '--address',
657         help='generate payouts to this address (default: <address requested from bitcoind>)',
658         type=str, action='store', default=None, dest='address')
659     parser.add_argument('--datadir',
660         help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
661         type=str, action='store', default=None, dest='datadir')
662     parser.add_argument('--logfile',
663         help='''log to this file (default: data/<NET>/log)''',
664         type=str, action='store', default=None, dest='logfile')
665     parser.add_argument('--merged',
666         help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
667         type=str, action='append', default=[], dest='merged_urls')
668     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
669         help='donate this percentage of work towards the development of p2pool (default: 0.5)',
670         type=float, action='store', default=0.5, dest='donation_percentage')
671     parser.add_argument('--iocp',
672         help='use Windows IOCP API in order to avoid errors due to large number of sockets being open',
673         action='store_true', default=False, dest='iocp')
674     parser.add_argument('--irc-announce',
675         help='announce any blocks found on irc://irc.freenode.net/#p2pool',
676         action='store_true', default=False, dest='irc_announce')
677     parser.add_argument('--no-bugreport',
678         help='disable submitting caught exceptions to the author',
679         action='store_true', default=False, dest='no_bugreport')
680     
681     p2pool_group = parser.add_argument_group('p2pool interface')
682     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
683         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
684         type=int, action='store', default=None, dest='p2pool_port')
685     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
686         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
687         type=str, action='append', default=[], dest='p2pool_nodes')
688     parser.add_argument('--disable-upnp',
689         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
690         action='store_false', default=True, dest='upnp')
691     p2pool_group.add_argument('--max-conns', metavar='CONNS',
692         help='maximum incoming connections (default: 40)',
693         type=int, action='store', default=40, dest='p2pool_conns')
694     
695     worker_group = parser.add_argument_group('worker interface')
696     worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
697         help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
698         type=str, action='store', default=None, dest='worker_endpoint')
699     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
700         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
701         type=float, action='store', default=0, dest='worker_fee')
702     
703     bitcoind_group = parser.add_argument_group('bitcoind interface')
704     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
705         help='connect to this address (default: 127.0.0.1)',
706         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
707     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
708         help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
709         type=int, action='store', default=None, dest='bitcoind_rpc_port')
710     bitcoind_group.add_argument('--bitcoind-rpc-ssl',
711         help='connect to JSON-RPC interface using SSL',
712         action='store_true', default=False, dest='bitcoind_rpc_ssl')
713     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
714         help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
715         type=int, action='store', default=None, dest='bitcoind_p2p_port')
716     
717     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
718         help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
719         type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
720     
721     args = parser.parse_args()
722     
723     if args.debug:
724         p2pool.DEBUG = True
725         defer.setDebugging(True)
726     
727     net_name = args.net_name + ('_testnet' if args.testnet else '')
728     net = networks.nets[net_name]
729     
730     datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
731     if not os.path.exists(datadir_path):
732         os.makedirs(datadir_path)
733     
734     if len(args.bitcoind_rpc_userpass) > 2:
735         parser.error('a maximum of two arguments are allowed')
736     args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
737     
738     if args.bitcoind_rpc_password is None:
739         conf_path = net.PARENT.CONF_FILE_FUNC()
740         if not os.path.exists(conf_path):
741             parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
742                 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
743                 '''\r\n'''
744                 '''server=1\r\n'''
745                 '''rpcpassword=%x\r\n'''
746                 '''\r\n'''
747                 '''Keep that password secret! After creating the file, restart Bitcoin.''' % (conf_path, random.randrange(2**128)))
748         conf = open(conf_path, 'rb').read()
749         contents = {}
750         for line in conf.splitlines(True):
751             if '#' in line:
752                 line = line[:line.index('#')]
753             if '=' not in line:
754                 continue
755             k, v = line.split('=', 1)
756             contents[k.strip()] = v.strip()
757         for conf_name, var_name, var_type in [
758             ('rpcuser', 'bitcoind_rpc_username', str),
759             ('rpcpassword', 'bitcoind_rpc_password', str),
760             ('rpcport', 'bitcoind_rpc_port', int),
761             ('port', 'bitcoind_p2p_port', int),
762         ]:
763             if getattr(args, var_name) is None and conf_name in contents:
764                 setattr(args, var_name, var_type(contents[conf_name]))
765         if args.bitcoind_rpc_password is None:
766             parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
767     
768     if args.bitcoind_rpc_username is None:
769         args.bitcoind_rpc_username = ''
770     
771     if args.bitcoind_rpc_port is None:
772         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
773     
774     if args.bitcoind_p2p_port is None:
775         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
776     
777     if args.p2pool_port is None:
778         args.p2pool_port = net.P2P_PORT
779     
780     if args.worker_endpoint is None:
781         worker_endpoint = '', net.WORKER_PORT
782     elif ':' not in args.worker_endpoint:
783         worker_endpoint = '', int(args.worker_endpoint)
784     else:
785         addr, port = args.worker_endpoint.rsplit(':', 1)
786         worker_endpoint = addr, int(port)
787     
788     if args.address is not None:
789         try:
790             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
791         except Exception, e:
792             parser.error('error parsing address: ' + repr(e))
793     else:
794         args.pubkey_hash = None
795     
796     def separate_url(url):
797         s = urlparse.urlsplit(url)
798         if '@' not in s.netloc:
799             parser.error('merged url netloc must contain an "@"')
800         userpass, new_netloc = s.netloc.rsplit('@', 1)
801         return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
802     merged_urls = map(separate_url, args.merged_urls)
803     
804     if args.logfile is None:
805         args.logfile = os.path.join(datadir_path, 'log')
806     
807     logfile = logging.LogFile(args.logfile)
808     pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
809     sys.stdout = logging.AbortPipe(pipe)
810     sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
811     if hasattr(signal, "SIGUSR1"):
812         def sigusr1(signum, frame):
813             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
814             logfile.reopen()
815             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
816         signal.signal(signal.SIGUSR1, sigusr1)
817     task.LoopingCall(logfile.reopen).start(5)
818     
819     class ErrorReporter(object):
820         def __init__(self):
821             self.last_sent = None
822         
823         def emit(self, eventDict):
824             if not eventDict["isError"]:
825                 return
826             
827             if self.last_sent is not None and time.time() < self.last_sent + 5:
828                 return
829             self.last_sent = time.time()
830             
831             if 'failure' in eventDict:
832                 text = ((eventDict.get('why') or 'Unhandled Error')
833                     + '\n' + eventDict['failure'].getTraceback())
834             else:
835                 text = " ".join([str(m) for m in eventDict["message"]]) + "\n"
836             
837             from twisted.web import client
838             client.getPage(
839                 url='http://u.forre.st/p2pool_error.cgi',
840                 method='POST',
841                 postdata=p2pool.__version__ + ' ' + net.NAME + '\n' + text,
842                 timeout=15,
843             ).addBoth(lambda x: None)
844     if not args.no_bugreport:
845         log.addObserver(ErrorReporter().emit)
846     
847     reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)
848     reactor.run()