don't send irc messages if not in #p2pool channel
[p2pool.git] / p2pool / main.py
1 from __future__ import division
2
3 import base64
4 import json
5 import os
6 import random
7 import sys
8 import time
9 import signal
10 import traceback
11 import urlparse
12
13 if '--iocp' in sys.argv:
14     from twisted.internet import iocpreactor
15     iocpreactor.install()
16 from twisted.internet import defer, reactor, protocol, task
17 from twisted.web import server
18 from twisted.python import log
19 from nattraverso import portmapper, ipdiscover
20
21 import bitcoin.p2p as bitcoin_p2p, bitcoin.data as bitcoin_data
22 from bitcoin import worker_interface, height_tracker
23 from util import fixargparse, jsonrpc, variable, deferral, math, logging
24 from . import p2p, networks, web, work
25 import p2pool, p2pool.data as p2pool_data
26
27 @deferral.retry('Error getting work from bitcoind:', 3)
28 @defer.inlineCallbacks
29 def getwork(bitcoind, use_getblocktemplate=False):
30     def go():
31         if use_getblocktemplate:
32             return bitcoind.rpc_getblocktemplate(dict(mode='template'))
33         else:
34             return bitcoind.rpc_getmemorypool()
35     try:
36         work = yield go()
37     except jsonrpc.Error_for_code(-32601): # Method not found
38         use_getblocktemplate = not use_getblocktemplate
39         try:
40             work = yield go()
41         except jsonrpc.Error_for_code(-32601): # Method not found
42             print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
43             raise deferral.RetrySilentlyException()
44     packed_transactions = [(x['data'] if isinstance(x, dict) else x).decode('hex') for x in work['transactions']]
45     if 'height' not in work:
46         work['height'] = (yield bitcoind.rpc_getblock(work['previousblockhash']))['height'] + 1
47     elif p2pool.DEBUG:
48         assert work['height'] == (yield bitcoind.rpc_getblock(work['previousblockhash']))['height'] + 1
49     defer.returnValue(dict(
50         version=work['version'],
51         previous_block=int(work['previousblockhash'], 16),
52         transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
53         subsidy=work['coinbasevalue'],
54         time=work['time'] if 'time' in work else work['curtime'],
55         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
56         coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
57         height=work['height'],
58         last_update=time.time(),
59         use_getblocktemplate=use_getblocktemplate,
60     ))
61
62 @defer.inlineCallbacks
63 def main(args, net, datadir_path, merged_urls, worker_endpoint):
64     try:
65         print 'p2pool (version %s)' % (p2pool.__version__,)
66         print
67         
68         traffic_happened = variable.Event()
69         
70         @defer.inlineCallbacks
71         def connect_p2p():
72             # connect to bitcoind over bitcoin-p2p
73             print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
74             factory = bitcoin_p2p.ClientFactory(net.PARENT)
75             reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
76             yield factory.getProtocol() # waits until handshake is successful
77             print '    ...success!'
78             print
79             defer.returnValue(factory)
80         
81         if args.testnet: # establish p2p connection first if testnet so bitcoind can work without connections
82             factory = yield connect_p2p()
83         
84         # connect to bitcoind over JSON-RPC and do initial getmemorypool
85         url = '%s://%s:%i/' % ('https' if args.bitcoind_rpc_ssl else 'http', args.bitcoind_address, args.bitcoind_rpc_port)
86         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
87         bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
88         @deferral.retry('Error while checking Bitcoin connection:', 1)
89         @defer.inlineCallbacks
90         def check():
91             if not (yield net.PARENT.RPC_CHECK(bitcoind)):
92                 print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
93                 raise deferral.RetrySilentlyException()
94             if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version']):
95                 print >>sys.stderr, '    Bitcoin version too old! Upgrade to 0.6.4 or newer!'
96                 raise deferral.RetrySilentlyException()
97         yield check()
98         temp_work = yield getwork(bitcoind)
99         
100         if not args.testnet:
101             factory = yield connect_p2p()
102         
103         block_height_var = variable.Variable(None)
104         @defer.inlineCallbacks
105         def poll_height():
106             block_height_var.set((yield deferral.retry('Error while calling getblockcount:')(bitcoind.rpc_getblockcount)()))
107         yield poll_height()
108         task.LoopingCall(poll_height).start(60*60)
109         
110         bitcoind_warning_var = variable.Variable(None)
111         @defer.inlineCallbacks
112         def poll_warnings():
113             errors = (yield deferral.retry('Error while calling getmininginfo:')(bitcoind.rpc_getmininginfo)())['errors']
114             bitcoind_warning_var.set(errors if errors != '' else None)
115         yield poll_warnings()
116         task.LoopingCall(poll_warnings).start(20*60)
117         
118         print '    ...success!'
119         print '    Current block hash: %x' % (temp_work['previous_block'],)
120         print '    Current block height: %i' % (block_height_var.value,)
121         print
122         
123         print 'Determining payout address...'
124         if args.pubkey_hash is None:
125             address_path = os.path.join(datadir_path, 'cached_payout_address')
126             
127             if os.path.exists(address_path):
128                 with open(address_path, 'rb') as f:
129                     address = f.read().strip('\r\n')
130                 print '    Loaded cached address: %s...' % (address,)
131             else:
132                 address = None
133             
134             if address is not None:
135                 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
136                 if not res['isvalid'] or not res['ismine']:
137                     print '    Cached address is either invalid or not controlled by local bitcoind!'
138                     address = None
139             
140             if address is None:
141                 print '    Getting payout address from bitcoind...'
142                 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
143             
144             with open(address_path, 'wb') as f:
145                 f.write(address)
146             
147             my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
148         else:
149             my_pubkey_hash = args.pubkey_hash
150         print '    ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
151         print
152         
153         my_share_hashes = set()
154         my_doa_share_hashes = set()
155         
156         tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
157         shared_share_hashes = set()
158         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
159         known_verified = set()
160         print "Loading shares..."
161         for i, (mode, contents) in enumerate(ss.get_shares()):
162             if mode == 'share':
163                 if contents.hash in tracker.items:
164                     continue
165                 shared_share_hashes.add(contents.hash)
166                 contents.time_seen = 0
167                 tracker.add(contents)
168                 if len(tracker.items) % 1000 == 0 and tracker.items:
169                     print "    %i" % (len(tracker.items),)
170             elif mode == 'verified_hash':
171                 known_verified.add(contents)
172             else:
173                 raise AssertionError()
174         print "    ...inserting %i verified shares..." % (len(known_verified),)
175         for h in known_verified:
176             if h not in tracker.items:
177                 ss.forget_verified_share(h)
178                 continue
179             tracker.verified.add(tracker.items[h])
180         print "    ...done loading %i shares!" % (len(tracker.items),)
181         print
182         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
183         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
184         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
185         
186         print 'Initializing work...'
187         
188         
189         # BITCOIND WORK
190         
191         bitcoind_work = variable.Variable((yield getwork(bitcoind)))
192         @defer.inlineCallbacks
193         def work_poller():
194             while True:
195                 flag = factory.new_block.get_deferred()
196                 try:
197                     bitcoind_work.set((yield getwork(bitcoind, bitcoind_work.value['use_getblocktemplate'])))
198                 except:
199                     log.err()
200                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
201         work_poller()
202         
203         # PEER WORK
204         
205         best_block_header = variable.Variable(None)
206         def handle_header(new_header):
207             # check that header matches current target
208             if not (net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(new_header)) <= bitcoind_work.value['bits'].target):
209                 return
210             bitcoind_best_block = bitcoind_work.value['previous_block']
211             if (best_block_header.value is None
212                 or (
213                     new_header['previous_block'] == bitcoind_best_block and
214                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)) == bitcoind_best_block
215                 ) # new is child of current and previous is current
216                 or (
217                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(new_header)) == bitcoind_best_block and
218                     best_block_header.value['previous_block'] != bitcoind_best_block
219                 )): # new is current and previous is not a child of current
220                 best_block_header.set(new_header)
221         @defer.inlineCallbacks
222         def poll_header():
223             handle_header((yield factory.conn.value.get_block_header(bitcoind_work.value['previous_block'])))
224         bitcoind_work.changed.watch(lambda _: poll_header())
225         yield deferral.retry('Error while requesting best block header:')(poll_header)()
226         
227         # BEST SHARE
228         
229         get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: bitcoind_work.value['previous_block'], net)
230         
231         best_share_var = variable.Variable(None)
232         desired_var = variable.Variable(None)
233         def set_best_share():
234             best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'])
235             
236             best_share_var.set(best)
237             desired_var.set(desired)
238         bitcoind_work.changed.watch(lambda _: set_best_share())
239         set_best_share()
240         
241         print '    ...success!'
242         print
243         
244         # setup p2p logic and join p2pool network
245         
246         class Node(p2p.Node):
247             def handle_shares(self, shares, peer):
248                 if len(shares) > 5:
249                     print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
250                 
251                 new_count = 0
252                 for share in shares:
253                     if share.hash in tracker.items:
254                         #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
255                         continue
256                     
257                     new_count += 1
258                     
259                     #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
260                     
261                     tracker.add(share)
262                 
263                 if new_count:
264                     set_best_share()
265                 
266                 if len(shares) > 5:
267                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.items), 2*net.CHAIN_LENGTH)
268             
269             @defer.inlineCallbacks
270             def handle_share_hashes(self, hashes, peer):
271                 new_hashes = [x for x in hashes if x not in tracker.items]
272                 if not new_hashes:
273                     return
274                 try:
275                     shares = yield peer.get_shares(
276                         hashes=new_hashes,
277                         parents=0,
278                         stops=[],
279                     )
280                 except:
281                     log.err(None, 'in handle_share_hashes:')
282                 else:
283                     self.handle_shares(shares, peer)
284             
285             def handle_get_shares(self, hashes, parents, stops, peer):
286                 parents = min(parents, 1000//len(hashes))
287                 stops = set(stops)
288                 shares = []
289                 for share_hash in hashes:
290                     for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
291                         if share.hash in stops:
292                             break
293                         shares.append(share)
294                 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
295                 return shares
296             
297             def handle_bestblock(self, header, peer):
298                 if net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header)) > header['bits'].target:
299                     raise p2p.PeerMisbehavingError('received block header fails PoW test')
300                 handle_header(header)
301         
302         @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
303         def submit_block_p2p(block):
304             if factory.conn.value is None:
305                 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, bitcoin_data.hash256(bitcoin_data.block_header_type.pack(block['header'])))
306                 raise deferral.RetrySilentlyException()
307             factory.conn.value.send_block(block=block)
308         
309         @deferral.retry('Error submitting block: (will retry)', 10, 10)
310         @defer.inlineCallbacks
311         def submit_block_rpc(block, ignore_failure):
312             if bitcoind_work.value['use_getblocktemplate']:
313                 result = yield bitcoind.rpc_submitblock(bitcoin_data.block_type.pack(block).encode('hex'))
314                 success = result is None
315             else:
316                 result = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
317                 success = result
318             success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
319             if (not success and success_expected and not ignore_failure) or (success and not success_expected):
320                 print >>sys.stderr, 'Block submittal result: %s (%r) Expected: %s' % (success, result, success_expected)
321         
322         def submit_block(block, ignore_failure):
323             submit_block_p2p(block)
324             submit_block_rpc(block, ignore_failure)
325         
326         @tracker.verified.added.watch
327         def _(share):
328             if share.pow_hash <= share.header['bits'].target:
329                 submit_block(share.as_block(tracker), ignore_failure=True)
330                 print
331                 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
332                 print
333                 def spread():
334                     if (get_height_rel_highest(share.header['previous_block']) > -5 or
335                         bitcoind_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]):
336                         broadcast_share(share.hash)
337                 spread()
338                 reactor.callLater(5, spread) # so get_height_rel_highest can update
339         
340         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
341         
342         @defer.inlineCallbacks
343         def parse(x):
344             if ':' in x:
345                 ip, port = x.split(':')
346                 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
347             else:
348                 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
349         
350         addrs = {}
351         if os.path.exists(os.path.join(datadir_path, 'addrs')):
352             try:
353                 with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
354                     addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
355             except:
356                 print >>sys.stderr, 'error parsing addrs'
357         for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
358             try:
359                 addr = yield addr_df
360                 if addr not in addrs:
361                     addrs[addr] = (0, time.time(), time.time())
362             except:
363                 log.err()
364         
365         connect_addrs = set()
366         for addr_df in map(parse, args.p2pool_nodes):
367             try:
368                 connect_addrs.add((yield addr_df))
369             except:
370                 log.err()
371         
372         p2p_node = Node(
373             best_share_hash_func=lambda: best_share_var.value,
374             port=args.p2pool_port,
375             net=net,
376             addr_store=addrs,
377             connect_addrs=connect_addrs,
378             max_incoming_conns=args.p2pool_conns,
379             traffic_happened=traffic_happened,
380         )
381         p2p_node.start()
382         
383         def save_addrs():
384             with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
385                 f.write(json.dumps(p2p_node.addr_store.items()))
386         task.LoopingCall(save_addrs).start(60)
387         
388         @best_block_header.changed.watch
389         def _(header):
390             for peer in p2p_node.peers.itervalues():
391                 peer.send_bestblock(header=header)
392         
393         @defer.inlineCallbacks
394         def broadcast_share(share_hash):
395             shares = []
396             for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
397                 if share.hash in shared_share_hashes:
398                     break
399                 shared_share_hashes.add(share.hash)
400                 shares.append(share)
401             
402             for peer in list(p2p_node.peers.itervalues()):
403                 yield peer.sendShares([share for share in shares if share.peer is not peer])
404         
405         # send share when the chain changes to their chain
406         best_share_var.changed.watch(broadcast_share)
407         
408         def save_shares():
409             for share in tracker.get_chain(best_share_var.value, min(tracker.get_height(best_share_var.value), 2*net.CHAIN_LENGTH)):
410                 ss.add_share(share)
411                 if share.hash in tracker.verified.items:
412                     ss.add_verified_hash(share.hash)
413         task.LoopingCall(save_shares).start(60)
414         
415         @apply
416         @defer.inlineCallbacks
417         def download_shares():
418             while True:
419                 desired = yield desired_var.get_when_satisfies(lambda val: len(val) != 0)
420                 peer2, share_hash = random.choice(desired)
421                 
422                 if len(p2p_node.peers) == 0:
423                     yield deferral.sleep(1)
424                     continue
425                 peer = random.choice(p2p_node.peers.values())
426                 
427                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
428                 try:
429                     shares = yield peer.get_shares(
430                         hashes=[share_hash],
431                         parents=500,
432                         stops=[],
433                     )
434                 except:
435                     log.err(None, 'in download_shares:')
436                     continue
437                 
438                 if not shares:
439                     yield deferral.sleep(1) # sleep so we don't keep rerequesting the same share nobody has
440                     continue
441                 p2p_node.handle_shares(shares, peer)
442         
443         print '    ...success!'
444         print
445         
446         if args.upnp:
447             @defer.inlineCallbacks
448             def upnp_thread():
449                 while True:
450                     try:
451                         is_lan, lan_ip = yield ipdiscover.get_local_ip()
452                         if is_lan:
453                             pm = yield portmapper.get_port_mapper()
454                             yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
455                     except defer.TimeoutError:
456                         pass
457                     except:
458                         if p2pool.DEBUG:
459                             log.err(None, 'UPnP error:')
460                     yield deferral.sleep(random.expovariate(1/120))
461             upnp_thread()
462         
463         # start listening for workers with a JSON-RPC server
464         
465         print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
466         
467         get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, best_share_var.value, bitcoind_work.value['bits'].target, bitcoind_work.value['subsidy'], net)
468         
469         wb = work.WorkerBridge(my_pubkey_hash, net, args.donation_percentage, bitcoind_work, best_block_header, merged_urls, best_share_var, tracker, my_share_hashes, my_doa_share_hashes, args.worker_fee, p2p_node, submit_block, set_best_share, broadcast_share, block_height_var)
470         web_root = web.get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net, wb.get_stale_counts, my_pubkey_hash, wb.local_rate_monitor, args.worker_fee, p2p_node, wb.my_share_hashes, wb.pseudoshare_received, wb.share_received, best_share_var, bitcoind_warning_var, traffic_happened, args.donation_percentage)
471         worker_interface.WorkerInterface(wb).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
472         
473         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
474         
475         with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
476             pass
477         
478         print '    ...success!'
479         print
480         
481         
482         # done!
483         print 'Started successfully!'
484         print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
485         if args.donation_percentage > 0.51:
486             print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
487         elif args.donation_percentage < 0.49:
488             print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
489         else:
490             print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
491             print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
492         print
493         
494         
495         if hasattr(signal, 'SIGALRM'):
496             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
497                 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
498             ))
499             signal.siginterrupt(signal.SIGALRM, False)
500             task.LoopingCall(signal.alarm, 30).start(1)
501         
502         if args.irc_announce:
503             from twisted.words.protocols import irc
504             class IRCClient(irc.IRCClient):
505                 nickname = 'p2pool%02i' % (random.randrange(100),)
506                 channel = net.ANNOUNCE_CHANNEL
507                 def lineReceived(self, line):
508                     if p2pool.DEBUG:
509                         print repr(line)
510                     irc.IRCClient.lineReceived(self, line)
511                 def signedOn(self):
512                     self.in_channel = False
513                     irc.IRCClient.signedOn(self)
514                     self.factory.resetDelay()
515                     self.join(self.channel)
516                     @defer.inlineCallbacks
517                     def new_share(share):
518                         if not self.in_channel:
519                             return
520                         if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
521                             yield deferral.sleep(random.expovariate(1/60))
522                             message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
523                             if all('%x' % (share.header_hash,) not in old_message for old_message in self.recent_messages):
524                                 self.say(self.channel, message)
525                                 self._remember_message(message)
526                     self.watch_id = tracker.verified.added.watch(new_share)
527                     self.recent_messages = []
528                 def joined(self, channel):
529                     self.in_channel = True
530                 def left(self, channel):
531                     self.in_channel = False
532                 def _remember_message(self, message):
533                     self.recent_messages.append(message)
534                     while len(self.recent_messages) > 100:
535                         self.recent_messages.pop(0)
536                 def privmsg(self, user, channel, message):
537                     if channel == self.channel:
538                         self._remember_message(message)
539                 def connectionLost(self, reason):
540                     tracker.verified.added.unwatch(self.watch_id)
541                     print 'IRC connection lost:', reason.getErrorMessage()
542             class IRCClientFactory(protocol.ReconnectingClientFactory):
543                 protocol = IRCClient
544             reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
545         
546         @defer.inlineCallbacks
547         def status_thread():
548             last_str = None
549             last_time = 0
550             while True:
551                 yield deferral.sleep(3)
552                 try:
553                     height = tracker.get_height(best_share_var.value)
554                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
555                         height,
556                         len(tracker.verified.items),
557                         len(tracker.items),
558                         len(p2p_node.peers),
559                         sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
560                     ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
561                     
562                     datums, dt = wb.local_rate_monitor.get_datums_in_last()
563                     my_att_s = sum(datum['work']/dt for datum in datums)
564                     this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
565                         math.format(int(my_att_s)),
566                         math.format_dt(dt),
567                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
568                         math.format_dt(2**256 / tracker.items[best_share_var.value].max_target / my_att_s) if my_att_s and best_share_var.value else '???',
569                     )
570                     
571                     if height > 2:
572                         (stale_orphan_shares, stale_doa_shares), shares, _ = wb.get_stale_counts()
573                         stale_prop = p2pool_data.get_average_stale_prop(tracker, best_share_var.value, min(60*60//net.SHARE_PERIOD, height))
574                         real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, best_share_var.value, min(height - 1, 60*60//net.SHARE_PERIOD)) / (1 - stale_prop)
575                         
576                         this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
577                             shares, stale_orphan_shares, stale_doa_shares,
578                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
579                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
580                             get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
581                         )
582                         this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
583                             math.format(int(real_att_s)),
584                             100*stale_prop,
585                             math.format_dt(2**256 / bitcoind_work.value['bits'].target / real_att_s),
586                         )
587                         
588                         for warning in p2pool_data.get_warnings(tracker, best_share_var.value, net, bitcoind_warning_var.value, bitcoind_work.value):
589                             print >>sys.stderr, '#'*40
590                             print >>sys.stderr, '>>> Warning: ' + warning
591                             print >>sys.stderr, '#'*40
592                     
593                     if this_str != last_str or time.time() > last_time + 15:
594                         print this_str
595                         last_str = this_str
596                         last_time = time.time()
597                 except:
598                     log.err()
599         status_thread()
600     except:
601         reactor.stop()
602         log.err(None, 'Fatal error:')
603
604 def run():
605     realnets = dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
606     
607     parser = fixargparse.FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
608     parser.add_argument('--version', action='version', version=p2pool.__version__)
609     parser.add_argument('--net',
610         help='use specified network (default: bitcoin)',
611         action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
612     parser.add_argument('--testnet',
613         help='''use the network's testnet''',
614         action='store_const', const=True, default=False, dest='testnet')
615     parser.add_argument('--debug',
616         help='enable debugging mode',
617         action='store_const', const=True, default=False, dest='debug')
618     parser.add_argument('-a', '--address',
619         help='generate payouts to this address (default: <address requested from bitcoind>)',
620         type=str, action='store', default=None, dest='address')
621     parser.add_argument('--datadir',
622         help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
623         type=str, action='store', default=None, dest='datadir')
624     parser.add_argument('--logfile',
625         help='''log to this file (default: data/<NET>/log)''',
626         type=str, action='store', default=None, dest='logfile')
627     parser.add_argument('--merged',
628         help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
629         type=str, action='append', default=[], dest='merged_urls')
630     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
631         help='donate this percentage of work towards the development of p2pool (default: 0.5)',
632         type=float, action='store', default=0.5, dest='donation_percentage')
633     parser.add_argument('--iocp',
634         help='use Windows IOCP API in order to avoid errors due to large number of sockets being open',
635         action='store_true', default=False, dest='iocp')
636     parser.add_argument('--irc-announce',
637         help='announce any blocks found on irc://irc.freenode.net/#p2pool',
638         action='store_true', default=False, dest='irc_announce')
639     parser.add_argument('--no-bugreport',
640         help='disable submitting caught exceptions to the author',
641         action='store_true', default=False, dest='no_bugreport')
642     
643     p2pool_group = parser.add_argument_group('p2pool interface')
644     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
645         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
646         type=int, action='store', default=None, dest='p2pool_port')
647     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
648         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
649         type=str, action='append', default=[], dest='p2pool_nodes')
650     parser.add_argument('--disable-upnp',
651         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
652         action='store_false', default=True, dest='upnp')
653     p2pool_group.add_argument('--max-conns', metavar='CONNS',
654         help='maximum incoming connections (default: 40)',
655         type=int, action='store', default=40, dest='p2pool_conns')
656     
657     worker_group = parser.add_argument_group('worker interface')
658     worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
659         help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
660         type=str, action='store', default=None, dest='worker_endpoint')
661     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
662         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
663         type=float, action='store', default=0, dest='worker_fee')
664     
665     bitcoind_group = parser.add_argument_group('bitcoind interface')
666     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
667         help='connect to this address (default: 127.0.0.1)',
668         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
669     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
670         help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
671         type=int, action='store', default=None, dest='bitcoind_rpc_port')
672     bitcoind_group.add_argument('--bitcoind-rpc-ssl',
673         help='connect to JSON-RPC interface using SSL',
674         action='store_true', default=False, dest='bitcoind_rpc_ssl')
675     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
676         help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
677         type=int, action='store', default=None, dest='bitcoind_p2p_port')
678     
679     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
680         help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
681         type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
682     
683     args = parser.parse_args()
684     
685     if args.debug:
686         p2pool.DEBUG = True
687         defer.setDebugging(True)
688     
689     net_name = args.net_name + ('_testnet' if args.testnet else '')
690     net = networks.nets[net_name]
691     
692     datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
693     if not os.path.exists(datadir_path):
694         os.makedirs(datadir_path)
695     
696     if len(args.bitcoind_rpc_userpass) > 2:
697         parser.error('a maximum of two arguments are allowed')
698     args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
699     
700     if args.bitcoind_rpc_password is None:
701         conf_path = net.PARENT.CONF_FILE_FUNC()
702         if not os.path.exists(conf_path):
703             parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
704                 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
705                 '''\r\n'''
706                 '''server=1\r\n'''
707                 '''rpcpassword=%x\r\n'''
708                 '''\r\n'''
709                 '''Keep that password secret! After creating the file, restart Bitcoin.''' % (conf_path, random.randrange(2**128)))
710         conf = open(conf_path, 'rb').read()
711         contents = {}
712         for line in conf.splitlines(True):
713             if '#' in line:
714                 line = line[:line.index('#')]
715             if '=' not in line:
716                 continue
717             k, v = line.split('=', 1)
718             contents[k.strip()] = v.strip()
719         for conf_name, var_name, var_type in [
720             ('rpcuser', 'bitcoind_rpc_username', str),
721             ('rpcpassword', 'bitcoind_rpc_password', str),
722             ('rpcport', 'bitcoind_rpc_port', int),
723             ('port', 'bitcoind_p2p_port', int),
724         ]:
725             if getattr(args, var_name) is None and conf_name in contents:
726                 setattr(args, var_name, var_type(contents[conf_name]))
727         if args.bitcoind_rpc_password is None:
728             parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
729     
730     if args.bitcoind_rpc_username is None:
731         args.bitcoind_rpc_username = ''
732     
733     if args.bitcoind_rpc_port is None:
734         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
735     
736     if args.bitcoind_p2p_port is None:
737         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
738     
739     if args.p2pool_port is None:
740         args.p2pool_port = net.P2P_PORT
741     
742     if args.worker_endpoint is None:
743         worker_endpoint = '', net.WORKER_PORT
744     elif ':' not in args.worker_endpoint:
745         worker_endpoint = '', int(args.worker_endpoint)
746     else:
747         addr, port = args.worker_endpoint.rsplit(':', 1)
748         worker_endpoint = addr, int(port)
749     
750     if args.address is not None:
751         try:
752             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
753         except Exception, e:
754             parser.error('error parsing address: ' + repr(e))
755     else:
756         args.pubkey_hash = None
757     
758     def separate_url(url):
759         s = urlparse.urlsplit(url)
760         if '@' not in s.netloc:
761             parser.error('merged url netloc must contain an "@"')
762         userpass, new_netloc = s.netloc.rsplit('@', 1)
763         return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
764     merged_urls = map(separate_url, args.merged_urls)
765     
766     if args.logfile is None:
767         args.logfile = os.path.join(datadir_path, 'log')
768     
769     logfile = logging.LogFile(args.logfile)
770     pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
771     sys.stdout = logging.AbortPipe(pipe)
772     sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
773     if hasattr(signal, "SIGUSR1"):
774         def sigusr1(signum, frame):
775             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
776             logfile.reopen()
777             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
778         signal.signal(signal.SIGUSR1, sigusr1)
779     task.LoopingCall(logfile.reopen).start(5)
780     
781     class ErrorReporter(object):
782         def __init__(self):
783             self.last_sent = None
784         
785         def emit(self, eventDict):
786             if not eventDict["isError"]:
787                 return
788             
789             if self.last_sent is not None and time.time() < self.last_sent + 5:
790                 return
791             self.last_sent = time.time()
792             
793             if 'failure' in eventDict:
794                 text = ((eventDict.get('why') or 'Unhandled Error')
795                     + '\n' + eventDict['failure'].getTraceback())
796             else:
797                 text = " ".join([str(m) for m in eventDict["message"]]) + "\n"
798             
799             from twisted.web import client
800             client.getPage(
801                 url='http://u.forre.st/p2pool_error.cgi',
802                 method='POST',
803                 postdata=p2pool.__version__ + ' ' + net.NAME + '\n' + text,
804                 timeout=15,
805             ).addBoth(lambda x: None)
806     if not args.no_bugreport:
807         log.addObserver(ErrorReporter().emit)
808     
809     reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)
810     reactor.run()