tx preforwarding working
[p2pool.git] / p2pool / main.py
1 from __future__ import division
2
3 import base64
4 import json
5 import os
6 import random
7 import sys
8 import time
9 import signal
10 import traceback
11 import urlparse
12
13 if '--iocp' in sys.argv:
14     from twisted.internet import iocpreactor
15     iocpreactor.install()
16 from twisted.internet import defer, reactor, protocol, task
17 from twisted.web import server
18 from twisted.python import log
19 from nattraverso import portmapper, ipdiscover
20
21 import bitcoin.p2p as bitcoin_p2p, bitcoin.data as bitcoin_data
22 from bitcoin import worker_interface, height_tracker
23 from util import fixargparse, jsonrpc, variable, deferral, math, logging
24 from . import p2p, networks, web, work
25 import p2pool, p2pool.data as p2pool_data
26
27 @deferral.retry('Error getting work from bitcoind:', 3)
28 @defer.inlineCallbacks
29 def getwork(bitcoind, use_getblocktemplate=False):
30     def go():
31         if use_getblocktemplate:
32             return bitcoind.rpc_getblocktemplate(dict(mode='template'))
33         else:
34             return bitcoind.rpc_getmemorypool()
35     try:
36         work = yield go()
37     except jsonrpc.Error_for_code(-32601): # Method not found
38         use_getblocktemplate = not use_getblocktemplate
39         try:
40             work = yield go()
41         except jsonrpc.Error_for_code(-32601): # Method not found
42             print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
43             raise deferral.RetrySilentlyException()
44     packed_transactions = [(x['data'] if isinstance(x, dict) else x).decode('hex') for x in work['transactions']]
45     if 'height' not in work:
46         work['height'] = (yield bitcoind.rpc_getblock(work['previousblockhash']))['height'] + 1
47     elif p2pool.DEBUG:
48         assert work['height'] == (yield bitcoind.rpc_getblock(work['previousblockhash']))['height'] + 1
49     defer.returnValue(dict(
50         version=work['version'],
51         previous_block=int(work['previousblockhash'], 16),
52         transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
53         subsidy=work['coinbasevalue'],
54         time=work['time'] if 'time' in work else work['curtime'],
55         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
56         coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
57         height=work['height'],
58         last_update=time.time(),
59         use_getblocktemplate=use_getblocktemplate,
60     ))
61
62 @defer.inlineCallbacks
63 def main(args, net, datadir_path, merged_urls, worker_endpoint):
64     try:
65         print 'p2pool (version %s)' % (p2pool.__version__,)
66         print
67         
68         traffic_happened = variable.Event()
69         
70         @defer.inlineCallbacks
71         def connect_p2p():
72             # connect to bitcoind over bitcoin-p2p
73             print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
74             factory = bitcoin_p2p.ClientFactory(net.PARENT)
75             reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
76             yield factory.getProtocol() # waits until handshake is successful
77             print '    ...success!'
78             print
79             defer.returnValue(factory)
80         
81         if args.testnet: # establish p2p connection first if testnet so bitcoind can work without connections
82             factory = yield connect_p2p()
83         
84         # connect to bitcoind over JSON-RPC and do initial getmemorypool
85         url = '%s://%s:%i/' % ('https' if args.bitcoind_rpc_ssl else 'http', args.bitcoind_address, args.bitcoind_rpc_port)
86         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
87         bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
88         @deferral.retry('Error while checking Bitcoin connection:', 1)
89         @defer.inlineCallbacks
90         def check():
91             if not (yield net.PARENT.RPC_CHECK(bitcoind)):
92                 print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
93                 raise deferral.RetrySilentlyException()
94             if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version']):
95                 print >>sys.stderr, '    Bitcoin version too old! Upgrade to 0.6.4 or newer!'
96                 raise deferral.RetrySilentlyException()
97         yield check()
98         temp_work = yield getwork(bitcoind)
99         
100         if not args.testnet:
101             factory = yield connect_p2p()
102         
103         block_height_var = variable.Variable(None)
104         @defer.inlineCallbacks
105         def poll_height():
106             block_height_var.set((yield deferral.retry('Error while calling getblockcount:')(bitcoind.rpc_getblockcount)()))
107         yield poll_height()
108         task.LoopingCall(poll_height).start(60*60)
109         
110         bitcoind_warning_var = variable.Variable(None)
111         @defer.inlineCallbacks
112         def poll_warnings():
113             errors = (yield deferral.retry('Error while calling getmininginfo:')(bitcoind.rpc_getmininginfo)())['errors']
114             bitcoind_warning_var.set(errors if errors != '' else None)
115         yield poll_warnings()
116         task.LoopingCall(poll_warnings).start(20*60)
117         
118         print '    ...success!'
119         print '    Current block hash: %x' % (temp_work['previous_block'],)
120         print '    Current block height: %i' % (block_height_var.value,)
121         print
122         
123         print 'Determining payout address...'
124         if args.pubkey_hash is None:
125             address_path = os.path.join(datadir_path, 'cached_payout_address')
126             
127             if os.path.exists(address_path):
128                 with open(address_path, 'rb') as f:
129                     address = f.read().strip('\r\n')
130                 print '    Loaded cached address: %s...' % (address,)
131             else:
132                 address = None
133             
134             if address is not None:
135                 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
136                 if not res['isvalid'] or not res['ismine']:
137                     print '    Cached address is either invalid or not controlled by local bitcoind!'
138                     address = None
139             
140             if address is None:
141                 print '    Getting payout address from bitcoind...'
142                 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
143             
144             with open(address_path, 'wb') as f:
145                 f.write(address)
146             
147             my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
148         else:
149             my_pubkey_hash = args.pubkey_hash
150         print '    ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
151         print
152         
153         my_share_hashes = set()
154         my_doa_share_hashes = set()
155         
156         tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
157         shared_share_hashes = set()
158         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
159         known_verified = set()
160         print "Loading shares..."
161         for i, (mode, contents) in enumerate(ss.get_shares()):
162             if mode == 'share':
163                 if contents.hash in tracker.items:
164                     continue
165                 shared_share_hashes.add(contents.hash)
166                 contents.time_seen = 0
167                 tracker.add(contents)
168                 if len(tracker.items) % 1000 == 0 and tracker.items:
169                     print "    %i" % (len(tracker.items),)
170             elif mode == 'verified_hash':
171                 known_verified.add(contents)
172             else:
173                 raise AssertionError()
174         print "    ...inserting %i verified shares..." % (len(known_verified),)
175         for h in known_verified:
176             if h not in tracker.items:
177                 ss.forget_verified_share(h)
178                 continue
179             tracker.verified.add(tracker.items[h])
180         print "    ...done loading %i shares!" % (len(tracker.items),)
181         print
182         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
183         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
184         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
185         
186         print 'Initializing work...'
187         
188         
189         # BITCOIND WORK
190         
191         bitcoind_work = variable.Variable((yield getwork(bitcoind)))
192         @defer.inlineCallbacks
193         def work_poller():
194             while True:
195                 flag = factory.new_block.get_deferred()
196                 try:
197                     bitcoind_work.set((yield getwork(bitcoind, bitcoind_work.value['use_getblocktemplate'])))
198                 except:
199                     log.err()
200                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
201         work_poller()
202         
203         # PEER WORK
204         
205         best_block_header = variable.Variable(None)
206         def handle_header(new_header):
207             # check that header matches current target
208             if not (net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(new_header)) <= bitcoind_work.value['bits'].target):
209                 return
210             bitcoind_best_block = bitcoind_work.value['previous_block']
211             if (best_block_header.value is None
212                 or (
213                     new_header['previous_block'] == bitcoind_best_block and
214                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)) == bitcoind_best_block
215                 ) # new is child of current and previous is current
216                 or (
217                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(new_header)) == bitcoind_best_block and
218                     best_block_header.value['previous_block'] != bitcoind_best_block
219                 )): # new is current and previous is not a child of current
220                 best_block_header.set(new_header)
221         @defer.inlineCallbacks
222         def poll_header():
223             handle_header((yield factory.conn.value.get_block_header(bitcoind_work.value['previous_block'])))
224         bitcoind_work.changed.watch(lambda _: poll_header())
225         yield deferral.retry('Error while requesting best block header:')(poll_header)()
226         
227         # BEST SHARE
228         
229         get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: bitcoind_work.value['previous_block'], net)
230         
231         best_share_var = variable.Variable(None)
232         desired_var = variable.Variable(None)
233         def set_best_share():
234             best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'])
235             
236             best_share_var.set(best)
237             desired_var.set(desired)
238         bitcoind_work.changed.watch(lambda _: set_best_share())
239         set_best_share()
240         
241         print '    ...success!'
242         print
243         
244         # setup p2p logic and join p2pool network
245         
246         known_txs_var = variable.Variable({}) # hash -> tx
247         mining_txs_var = variable.Variable({}) # hash -> tx
248         @bitcoind_work.changed.watch
249         def _(work):
250             new_mining_txs = {}
251             for tx in work['transactions']:
252                 new_mining_txs[bitcoin_data.hash256(bitcoin_data.tx_type.pack(tx))] = tx
253             mining_txs_var.set(new_mining_txs)
254         
255         class Node(p2p.Node):
256             def handle_shares(self, shares, peer):
257                 if len(shares) > 5:
258                     print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
259                 
260                 new_count = 0
261                 for share in shares:
262                     if share.hash in tracker.items:
263                         #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
264                         continue
265                     
266                     new_count += 1
267                     
268                     #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
269                     
270                     tracker.add(share)
271                 
272                 if new_count:
273                     set_best_share()
274                 
275                 if len(shares) > 5:
276                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.items), 2*net.CHAIN_LENGTH)
277             
278             @defer.inlineCallbacks
279             def handle_share_hashes(self, hashes, peer):
280                 new_hashes = [x for x in hashes if x not in tracker.items]
281                 if not new_hashes:
282                     return
283                 try:
284                     shares = yield peer.get_shares(
285                         hashes=new_hashes,
286                         parents=0,
287                         stops=[],
288                     )
289                 except:
290                     log.err(None, 'in handle_share_hashes:')
291                 else:
292                     self.handle_shares(shares, peer)
293             
294             def handle_get_shares(self, hashes, parents, stops, peer):
295                 parents = min(parents, 1000//len(hashes))
296                 stops = set(stops)
297                 shares = []
298                 for share_hash in hashes:
299                     for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
300                         if share.hash in stops:
301                             break
302                         shares.append(share)
303                 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
304                 return shares
305             
306             def handle_bestblock(self, header, peer):
307                 if net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header)) > header['bits'].target:
308                     raise p2p.PeerMisbehavingError('received block header fails PoW test')
309                 handle_header(header)
310         
311         @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
312         def submit_block_p2p(block):
313             if factory.conn.value is None:
314                 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, bitcoin_data.hash256(bitcoin_data.block_header_type.pack(block['header'])))
315                 raise deferral.RetrySilentlyException()
316             factory.conn.value.send_block(block=block)
317         
318         @deferral.retry('Error submitting block: (will retry)', 10, 10)
319         @defer.inlineCallbacks
320         def submit_block_rpc(block, ignore_failure):
321             if bitcoind_work.value['use_getblocktemplate']:
322                 result = yield bitcoind.rpc_submitblock(bitcoin_data.block_type.pack(block).encode('hex'))
323                 success = result is None
324             else:
325                 result = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
326                 success = result
327             success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
328             if (not success and success_expected and not ignore_failure) or (success and not success_expected):
329                 print >>sys.stderr, 'Block submittal result: %s (%r) Expected: %s' % (success, result, success_expected)
330         
331         def submit_block(block, ignore_failure):
332             submit_block_p2p(block)
333             submit_block_rpc(block, ignore_failure)
334         
335         @tracker.verified.added.watch
336         def _(share):
337             if share.pow_hash <= share.header['bits'].target:
338                 submit_block(share.as_block(tracker), ignore_failure=True)
339                 print
340                 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
341                 print
342                 def spread():
343                     if (get_height_rel_highest(share.header['previous_block']) > -5 or
344                         bitcoind_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]):
345                         broadcast_share(share.hash)
346                 spread()
347                 reactor.callLater(5, spread) # so get_height_rel_highest can update
348         
349         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
350         
351         @defer.inlineCallbacks
352         def parse(x):
353             if ':' in x:
354                 ip, port = x.split(':')
355                 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
356             else:
357                 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
358         
359         addrs = {}
360         if os.path.exists(os.path.join(datadir_path, 'addrs')):
361             try:
362                 with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
363                     addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
364             except:
365                 print >>sys.stderr, 'error parsing addrs'
366         for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
367             try:
368                 addr = yield addr_df
369                 if addr not in addrs:
370                     addrs[addr] = (0, time.time(), time.time())
371             except:
372                 log.err()
373         
374         connect_addrs = set()
375         for addr_df in map(parse, args.p2pool_nodes):
376             try:
377                 connect_addrs.add((yield addr_df))
378             except:
379                 log.err()
380         
381         p2p_node = Node(
382             best_share_hash_func=lambda: best_share_var.value,
383             port=args.p2pool_port,
384             net=net,
385             addr_store=addrs,
386             connect_addrs=connect_addrs,
387             max_incoming_conns=args.p2pool_conns,
388             traffic_happened=traffic_happened,
389             known_txs_var=known_txs_var,
390             mining_txs_var=mining_txs_var,
391         )
392         p2p_node.start()
393         
394         def save_addrs():
395             with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
396                 f.write(json.dumps(p2p_node.addr_store.items()))
397         task.LoopingCall(save_addrs).start(60)
398         
399         @best_block_header.changed.watch
400         def _(header):
401             for peer in p2p_node.peers.itervalues():
402                 peer.send_bestblock(header=header)
403         
404         @defer.inlineCallbacks
405         def broadcast_share(share_hash):
406             shares = []
407             for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
408                 if share.hash in shared_share_hashes:
409                     break
410                 shared_share_hashes.add(share.hash)
411                 shares.append(share)
412             
413             for peer in list(p2p_node.peers.itervalues()):
414                 yield peer.sendShares([share for share in shares if share.peer is not peer])
415         
416         # send share when the chain changes to their chain
417         best_share_var.changed.watch(broadcast_share)
418         
419         def save_shares():
420             for share in tracker.get_chain(best_share_var.value, min(tracker.get_height(best_share_var.value), 2*net.CHAIN_LENGTH)):
421                 ss.add_share(share)
422                 if share.hash in tracker.verified.items:
423                     ss.add_verified_hash(share.hash)
424         task.LoopingCall(save_shares).start(60)
425         
426         @apply
427         @defer.inlineCallbacks
428         def download_shares():
429             while True:
430                 desired = yield desired_var.get_when_satisfies(lambda val: len(val) != 0)
431                 peer2, share_hash = random.choice(desired)
432                 
433                 if len(p2p_node.peers) == 0:
434                     yield deferral.sleep(1)
435                     continue
436                 peer = random.choice(p2p_node.peers.values())
437                 
438                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
439                 try:
440                     shares = yield peer.get_shares(
441                         hashes=[share_hash],
442                         parents=500,
443                         stops=[],
444                     )
445                 except:
446                     log.err(None, 'in download_shares:')
447                     continue
448                 
449                 if not shares:
450                     yield deferral.sleep(1) # sleep so we don't keep rerequesting the same share nobody has
451                     continue
452                 p2p_node.handle_shares(shares, peer)
453         
454         print '    ...success!'
455         print
456         
457         if args.upnp:
458             @defer.inlineCallbacks
459             def upnp_thread():
460                 while True:
461                     try:
462                         is_lan, lan_ip = yield ipdiscover.get_local_ip()
463                         if is_lan:
464                             pm = yield portmapper.get_port_mapper()
465                             yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
466                     except defer.TimeoutError:
467                         pass
468                     except:
469                         if p2pool.DEBUG:
470                             log.err(None, 'UPnP error:')
471                     yield deferral.sleep(random.expovariate(1/120))
472             upnp_thread()
473         
474         # start listening for workers with a JSON-RPC server
475         
476         print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
477         
478         get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, best_share_var.value, bitcoind_work.value['bits'].target, bitcoind_work.value['subsidy'], net)
479         
480         wb = work.WorkerBridge(my_pubkey_hash, net, args.donation_percentage, bitcoind_work, best_block_header, merged_urls, best_share_var, tracker, my_share_hashes, my_doa_share_hashes, args.worker_fee, p2p_node, submit_block, set_best_share, broadcast_share, block_height_var)
481         web_root = web.get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net, wb.get_stale_counts, my_pubkey_hash, wb.local_rate_monitor, args.worker_fee, p2p_node, wb.my_share_hashes, wb.pseudoshare_received, wb.share_received, best_share_var, bitcoind_warning_var, traffic_happened)
482         worker_interface.WorkerInterface(wb).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
483         
484         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
485         
486         with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
487             pass
488         
489         print '    ...success!'
490         print
491         
492         
493         # done!
494         print 'Started successfully!'
495         print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
496         if args.donation_percentage > 0.51:
497             print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
498         elif args.donation_percentage < 0.49:
499             print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
500         else:
501             print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
502             print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
503         print
504         
505         
506         if hasattr(signal, 'SIGALRM'):
507             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
508                 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
509             ))
510             signal.siginterrupt(signal.SIGALRM, False)
511             task.LoopingCall(signal.alarm, 30).start(1)
512         
513         if args.irc_announce:
514             from twisted.words.protocols import irc
515             class IRCClient(irc.IRCClient):
516                 nickname = 'p2pool%02i' % (random.randrange(100),)
517                 channel = net.ANNOUNCE_CHANNEL
518                 def lineReceived(self, line):
519                     if p2pool.DEBUG:
520                         print repr(line)
521                     irc.IRCClient.lineReceived(self, line)
522                 def signedOn(self):
523                     irc.IRCClient.signedOn(self)
524                     self.factory.resetDelay()
525                     self.join(self.channel)
526                     @defer.inlineCallbacks
527                     def new_share(share):
528                         if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
529                             yield deferral.sleep(random.expovariate(1/60))
530                             message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
531                             if all('%x' % (share.header_hash,) not in old_message for old_message in self.recent_messages):
532                                 self.say(self.channel, message)
533                                 self._remember_message(message)
534                     self.watch_id = tracker.verified.added.watch(new_share)
535                     self.recent_messages = []
536                 def _remember_message(self, message):
537                     self.recent_messages.append(message)
538                     while len(self.recent_messages) > 100:
539                         self.recent_messages.pop(0)
540                 def privmsg(self, user, channel, message):
541                     if channel == self.channel:
542                         self._remember_message(message)
543                 def connectionLost(self, reason):
544                     tracker.verified.added.unwatch(self.watch_id)
545                     print 'IRC connection lost:', reason.getErrorMessage()
546             class IRCClientFactory(protocol.ReconnectingClientFactory):
547                 protocol = IRCClient
548             reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
549         
550         @defer.inlineCallbacks
551         def status_thread():
552             last_str = None
553             last_time = 0
554             while True:
555                 yield deferral.sleep(3)
556                 try:
557                     height = tracker.get_height(best_share_var.value)
558                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
559                         height,
560                         len(tracker.verified.items),
561                         len(tracker.items),
562                         len(p2p_node.peers),
563                         sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
564                     ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
565                     
566                     datums, dt = wb.local_rate_monitor.get_datums_in_last()
567                     my_att_s = sum(datum['work']/dt for datum in datums)
568                     this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
569                         math.format(int(my_att_s)),
570                         math.format_dt(dt),
571                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
572                         math.format_dt(2**256 / tracker.items[best_share_var.value].max_target / my_att_s) if my_att_s and best_share_var.value else '???',
573                     )
574                     
575                     if height > 2:
576                         (stale_orphan_shares, stale_doa_shares), shares, _ = wb.get_stale_counts()
577                         stale_prop = p2pool_data.get_average_stale_prop(tracker, best_share_var.value, min(60*60//net.SHARE_PERIOD, height))
578                         real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, best_share_var.value, min(height - 1, 60*60//net.SHARE_PERIOD)) / (1 - stale_prop)
579                         
580                         this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
581                             shares, stale_orphan_shares, stale_doa_shares,
582                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
583                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
584                             get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
585                         )
586                         this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
587                             math.format(int(real_att_s)),
588                             100*stale_prop,
589                             math.format_dt(2**256 / bitcoind_work.value['bits'].target / real_att_s),
590                         )
591                         
592                         for warning in p2pool_data.get_warnings(tracker, best_share_var.value, net, bitcoind_warning_var.value, bitcoind_work.value):
593                             print >>sys.stderr, '#'*40
594                             print >>sys.stderr, '>>> Warning: ' + warning
595                             print >>sys.stderr, '#'*40
596                     
597                     if this_str != last_str or time.time() > last_time + 15:
598                         print this_str
599                         last_str = this_str
600                         last_time = time.time()
601                 except:
602                     log.err()
603         status_thread()
604     except:
605         reactor.stop()
606         log.err(None, 'Fatal error:')
607
608 def run():
609     realnets = dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
610     
611     parser = fixargparse.FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
612     parser.add_argument('--version', action='version', version=p2pool.__version__)
613     parser.add_argument('--net',
614         help='use specified network (default: bitcoin)',
615         action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
616     parser.add_argument('--testnet',
617         help='''use the network's testnet''',
618         action='store_const', const=True, default=False, dest='testnet')
619     parser.add_argument('--debug',
620         help='enable debugging mode',
621         action='store_const', const=True, default=False, dest='debug')
622     parser.add_argument('-a', '--address',
623         help='generate payouts to this address (default: <address requested from bitcoind>)',
624         type=str, action='store', default=None, dest='address')
625     parser.add_argument('--datadir',
626         help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
627         type=str, action='store', default=None, dest='datadir')
628     parser.add_argument('--logfile',
629         help='''log to this file (default: data/<NET>/log)''',
630         type=str, action='store', default=None, dest='logfile')
631     parser.add_argument('--merged',
632         help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
633         type=str, action='append', default=[], dest='merged_urls')
634     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
635         help='donate this percentage of work towards the development of p2pool (default: 0.5)',
636         type=float, action='store', default=0.5, dest='donation_percentage')
637     parser.add_argument('--iocp',
638         help='use Windows IOCP API in order to avoid errors due to large number of sockets being open',
639         action='store_true', default=False, dest='iocp')
640     parser.add_argument('--irc-announce',
641         help='announce any blocks found on irc://irc.freenode.net/#p2pool',
642         action='store_true', default=False, dest='irc_announce')
643     parser.add_argument('--no-bugreport',
644         help='disable submitting caught exceptions to the author',
645         action='store_true', default=False, dest='no_bugreport')
646     
647     p2pool_group = parser.add_argument_group('p2pool interface')
648     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
649         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
650         type=int, action='store', default=None, dest='p2pool_port')
651     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
652         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
653         type=str, action='append', default=[], dest='p2pool_nodes')
654     parser.add_argument('--disable-upnp',
655         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
656         action='store_false', default=True, dest='upnp')
657     p2pool_group.add_argument('--max-conns', metavar='CONNS',
658         help='maximum incoming connections (default: 40)',
659         type=int, action='store', default=40, dest='p2pool_conns')
660     
661     worker_group = parser.add_argument_group('worker interface')
662     worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
663         help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
664         type=str, action='store', default=None, dest='worker_endpoint')
665     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
666         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
667         type=float, action='store', default=0, dest='worker_fee')
668     
669     bitcoind_group = parser.add_argument_group('bitcoind interface')
670     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
671         help='connect to this address (default: 127.0.0.1)',
672         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
673     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
674         help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
675         type=int, action='store', default=None, dest='bitcoind_rpc_port')
676     bitcoind_group.add_argument('--bitcoind-rpc-ssl',
677         help='connect to JSON-RPC interface using SSL',
678         action='store_true', default=False, dest='bitcoind_rpc_ssl')
679     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
680         help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
681         type=int, action='store', default=None, dest='bitcoind_p2p_port')
682     
683     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
684         help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
685         type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
686     
687     args = parser.parse_args()
688     
689     if args.debug:
690         p2pool.DEBUG = True
691         defer.setDebugging(True)
692     
693     net_name = args.net_name + ('_testnet' if args.testnet else '')
694     net = networks.nets[net_name]
695     
696     datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
697     if not os.path.exists(datadir_path):
698         os.makedirs(datadir_path)
699     
700     if len(args.bitcoind_rpc_userpass) > 2:
701         parser.error('a maximum of two arguments are allowed')
702     args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
703     
704     if args.bitcoind_rpc_password is None:
705         conf_path = net.PARENT.CONF_FILE_FUNC()
706         if not os.path.exists(conf_path):
707             parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
708                 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
709                 '''\r\n'''
710                 '''server=1\r\n'''
711                 '''rpcpassword=%x\r\n'''
712                 '''\r\n'''
713                 '''Keep that password secret! After creating the file, restart Bitcoin.''' % (conf_path, random.randrange(2**128)))
714         conf = open(conf_path, 'rb').read()
715         contents = {}
716         for line in conf.splitlines(True):
717             if '#' in line:
718                 line = line[:line.index('#')]
719             if '=' not in line:
720                 continue
721             k, v = line.split('=', 1)
722             contents[k.strip()] = v.strip()
723         for conf_name, var_name, var_type in [
724             ('rpcuser', 'bitcoind_rpc_username', str),
725             ('rpcpassword', 'bitcoind_rpc_password', str),
726             ('rpcport', 'bitcoind_rpc_port', int),
727             ('port', 'bitcoind_p2p_port', int),
728         ]:
729             if getattr(args, var_name) is None and conf_name in contents:
730                 setattr(args, var_name, var_type(contents[conf_name]))
731         if args.bitcoind_rpc_password is None:
732             parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
733     
734     if args.bitcoind_rpc_username is None:
735         args.bitcoind_rpc_username = ''
736     
737     if args.bitcoind_rpc_port is None:
738         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
739     
740     if args.bitcoind_p2p_port is None:
741         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
742     
743     if args.p2pool_port is None:
744         args.p2pool_port = net.P2P_PORT
745     
746     if args.worker_endpoint is None:
747         worker_endpoint = '', net.WORKER_PORT
748     elif ':' not in args.worker_endpoint:
749         worker_endpoint = '', int(args.worker_endpoint)
750     else:
751         addr, port = args.worker_endpoint.rsplit(':', 1)
752         worker_endpoint = addr, int(port)
753     
754     if args.address is not None:
755         try:
756             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
757         except Exception, e:
758             parser.error('error parsing address: ' + repr(e))
759     else:
760         args.pubkey_hash = None
761     
762     def separate_url(url):
763         s = urlparse.urlsplit(url)
764         if '@' not in s.netloc:
765             parser.error('merged url netloc must contain an "@"')
766         userpass, new_netloc = s.netloc.rsplit('@', 1)
767         return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
768     merged_urls = map(separate_url, args.merged_urls)
769     
770     if args.logfile is None:
771         args.logfile = os.path.join(datadir_path, 'log')
772     
773     logfile = logging.LogFile(args.logfile)
774     pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
775     sys.stdout = logging.AbortPipe(pipe)
776     sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
777     if hasattr(signal, "SIGUSR1"):
778         def sigusr1(signum, frame):
779             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
780             logfile.reopen()
781             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
782         signal.signal(signal.SIGUSR1, sigusr1)
783     task.LoopingCall(logfile.reopen).start(5)
784     
785     class ErrorReporter(object):
786         def __init__(self):
787             self.last_sent = None
788         
789         def emit(self, eventDict):
790             if not eventDict["isError"]:
791                 return
792             
793             if self.last_sent is not None and time.time() < self.last_sent + 5:
794                 return
795             self.last_sent = time.time()
796             
797             if 'failure' in eventDict:
798                 text = ((eventDict.get('why') or 'Unhandled Error')
799                     + '\n' + eventDict['failure'].getTraceback())
800             else:
801                 text = " ".join([str(m) for m in eventDict["message"]]) + "\n"
802             
803             from twisted.web import client
804             client.getPage(
805                 url='http://u.forre.st/p2pool_error.cgi',
806                 method='POST',
807                 postdata=p2pool.__version__ + ' ' + net.NAME + '\n' + text,
808                 timeout=15,
809             ).addBoth(lambda x: None)
810     if not args.no_bugreport:
811         log.addObserver(ErrorReporter().emit)
812     
813     reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)
814     reactor.run()