instead of trying to use bitcoind's clock offset, just use raw system clock for share...
[p2pool.git] / p2pool / main.py
1 from __future__ import division
2
3 import ConfigParser
4 import StringIO
5 import base64
6 import json
7 import os
8 import random
9 import sys
10 import time
11 import signal
12 import traceback
13 import urlparse
14
15 if '--iocp' in sys.argv:
16     from twisted.internet import iocpreactor
17     iocpreactor.install()
18 from twisted.internet import defer, reactor, protocol, task
19 from twisted.web import server
20 from twisted.python import log
21 from nattraverso import portmapper, ipdiscover
22
23 import bitcoin.p2p as bitcoin_p2p, bitcoin.data as bitcoin_data
24 from bitcoin import worker_interface, height_tracker
25 from util import expiring_dict, fixargparse, jsonrpc, variable, deferral, math, logging
26 from . import p2p, networks, web, work
27 import p2pool, p2pool.data as p2pool_data
28
29 @deferral.retry('Error getting work from bitcoind:', 3)
30 @defer.inlineCallbacks
31 def getwork(bitcoind, use_getblocktemplate=False):
32     def go():
33         if use_getblocktemplate:
34             return bitcoind.rpc_getblocktemplate({})
35         else:
36             return bitcoind.rpc_getmemorypool()
37     try:
38         work = yield go()
39     except jsonrpc.Error_for_code(-32601): # Method not found
40         use_getblocktemplate = not use_getblocktemplate
41         try:
42             work = yield go()
43         except jsonrpc.Error_for_code(-32601): # Method not found
44             print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
45             raise deferral.RetrySilentlyException()
46     packed_transactions = [(x['data'] if isinstance(x, dict) else x).decode('hex') for x in work['transactions']]
47     if 'height' not in work:
48         work['height'] = (yield bitcoind.rpc_getblock(work['previousblockhash']))['height'] + 1
49     elif p2pool.DEBUG:
50         assert work['height'] == (yield bitcoind.rpc_getblock(work['previousblockhash']))['height'] + 1
51     defer.returnValue(dict(
52         version=work['version'],
53         previous_block=int(work['previousblockhash'], 16),
54         transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
55         merkle_link=bitcoin_data.calculate_merkle_link([None] + map(bitcoin_data.hash256, packed_transactions), 0),
56         subsidy=work['coinbasevalue'],
57         time=work['time'] if 'time' in work else work['curtime'],
58         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
59         coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
60         height=work['height'],
61         last_update=time.time(),
62         use_getblocktemplate=use_getblocktemplate,
63     ))
64
65 @defer.inlineCallbacks
66 def main(args, net, datadir_path, merged_urls, worker_endpoint):
67     try:
68         print 'p2pool (version %s)' % (p2pool.__version__,)
69         print
70         
71         # connect to bitcoind over JSON-RPC and do initial getmemorypool
72         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
73         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
74         bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
75         @deferral.retry('Error while checking Bitcoin connection:', 1)
76         @defer.inlineCallbacks
77         def check():
78             if not (yield net.PARENT.RPC_CHECK(bitcoind)):
79                 print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
80                 raise deferral.RetrySilentlyException()
81             temp_work = yield getwork(bitcoind)
82             if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version'], temp_work):
83                 print >>sys.stderr, '    Bitcoin version too old! BIP16 support required! Upgrade to 0.6.0rc4 or greater!'
84                 raise deferral.RetrySilentlyException()
85             defer.returnValue(temp_work)
86         temp_work = yield check()
87         
88         block_height_var = variable.Variable(None)
89         @defer.inlineCallbacks
90         def poll_height():
91             block_height_var.set((yield deferral.retry('Error while calling getblockcount:')(bitcoind.rpc_getblockcount)()))
92         yield poll_height()
93         task.LoopingCall(poll_height).start(60*60)
94         
95         bitcoind_warning_var = variable.Variable(None)
96         @defer.inlineCallbacks
97         def poll_warnings():
98             errors = (yield deferral.retry('Error while calling getmininginfo:')(bitcoind.rpc_getmininginfo)())['errors']
99             bitcoind_warning_var.set(errors if errors != '' else None)
100         yield poll_warnings()
101         task.LoopingCall(poll_warnings).start(20*60)
102         
103         print '    ...success!'
104         print '    Current block hash: %x' % (temp_work['previous_block'],)
105         print '    Current block height: %i' % (block_height_var.value,)
106         print
107         
108         # connect to bitcoind over bitcoin-p2p
109         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
110         factory = bitcoin_p2p.ClientFactory(net.PARENT)
111         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
112         yield factory.getProtocol() # waits until handshake is successful
113         print '    ...success!'
114         print
115         
116         print 'Determining payout address...'
117         if args.pubkey_hash is None:
118             address_path = os.path.join(datadir_path, 'cached_payout_address')
119             
120             if os.path.exists(address_path):
121                 with open(address_path, 'rb') as f:
122                     address = f.read().strip('\r\n')
123                 print '    Loaded cached address: %s...' % (address,)
124             else:
125                 address = None
126             
127             if address is not None:
128                 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
129                 if not res['isvalid'] or not res['ismine']:
130                     print '    Cached address is either invalid or not controlled by local bitcoind!'
131                     address = None
132             
133             if address is None:
134                 print '    Getting payout address from bitcoind...'
135                 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
136             
137             with open(address_path, 'wb') as f:
138                 f.write(address)
139             
140             my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
141         else:
142             my_pubkey_hash = args.pubkey_hash
143         print '    ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
144         print
145         
146         my_share_hashes = set()
147         my_doa_share_hashes = set()
148         
149         tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
150         shared_share_hashes = set()
151         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
152         known_verified = set()
153         print "Loading shares..."
154         for i, (mode, contents) in enumerate(ss.get_shares()):
155             if mode == 'share':
156                 if contents.hash in tracker.items:
157                     continue
158                 shared_share_hashes.add(contents.hash)
159                 contents.time_seen = 0
160                 tracker.add(contents)
161                 if len(tracker.items) % 1000 == 0 and tracker.items:
162                     print "    %i" % (len(tracker.items),)
163             elif mode == 'verified_hash':
164                 known_verified.add(contents)
165             else:
166                 raise AssertionError()
167         print "    ...inserting %i verified shares..." % (len(known_verified),)
168         for h in known_verified:
169             if h not in tracker.items:
170                 ss.forget_verified_share(h)
171                 continue
172             tracker.verified.add(tracker.items[h])
173         print "    ...done loading %i shares!" % (len(tracker.items),)
174         print
175         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
176         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
177         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
178         
179         print 'Initializing work...'
180         
181         
182         # BITCOIND WORK
183         
184         bitcoind_work = variable.Variable((yield getwork(bitcoind)))
185         @defer.inlineCallbacks
186         def work_poller():
187             while True:
188                 flag = factory.new_block.get_deferred()
189                 try:
190                     bitcoind_work.set((yield getwork(bitcoind, bitcoind_work.value['use_getblocktemplate'])))
191                 except:
192                     log.err()
193                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
194         work_poller()
195         
196         # PEER WORK
197         
198         best_block_header = variable.Variable(None)
199         def handle_header(new_header):
200             # check that header matches current target
201             if not (net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(new_header)) <= bitcoind_work.value['bits'].target):
202                 return
203             bitcoind_best_block = bitcoind_work.value['previous_block']
204             if (best_block_header.value is None
205                 or (
206                     new_header['previous_block'] == bitcoind_best_block and
207                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)) == bitcoind_best_block
208                 ) # new is child of current and previous is current
209                 or (
210                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(new_header)) == bitcoind_best_block and
211                     best_block_header.value['previous_block'] != bitcoind_best_block
212                 )): # new is current and previous is not a child of current
213                 best_block_header.set(new_header)
214         @defer.inlineCallbacks
215         def poll_header():
216             handle_header((yield factory.conn.value.get_block_header(bitcoind_work.value['previous_block'])))
217         bitcoind_work.changed.watch(lambda _: poll_header())
218         yield deferral.retry('Error while requesting best block header:')(poll_header)()
219         
220         # BEST SHARE
221         
222         get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: bitcoind_work.value['previous_block'], net)
223         requested = expiring_dict.ExpiringDict(300)
224         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
225         
226         best_share_var = variable.Variable(None)
227         def set_best_share():
228             best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'])
229             
230             best_share_var.set(best)
231             
232             t = time.time()
233             for peer2, share_hash in desired:
234                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
235                     continue
236                 last_request_time, count = requested.get(share_hash, (None, 0))
237                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
238                     continue
239                 potential_peers = set()
240                 for head in tracker.tails[share_hash]:
241                     potential_peers.update(peer_heads.get(head, set()))
242                 potential_peers = [peer for peer in potential_peers if peer.connected2]
243                 if count == 0 and peer2 is not None and peer2.connected2:
244                     peer = peer2
245                 else:
246                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
247                     if peer is None:
248                         continue
249                 
250                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
251                 peer.send_getshares(
252                     hashes=[share_hash],
253                     parents=2000,
254                     stops=list(set(tracker.heads) | set(
255                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
256                     ))[:100],
257                 )
258                 requested[share_hash] = t, count + 1
259         bitcoind_work.changed.watch(lambda _: set_best_share())
260         set_best_share()
261         
262         
263         print '    ...success!'
264         print
265         
266         # setup p2p logic and join p2pool network
267         
268         class Node(p2p.Node):
269             def handle_shares(self, shares, peer):
270                 if len(shares) > 5:
271                     print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
272                 
273                 new_count = 0
274                 for share in shares:
275                     if share.hash in tracker.items:
276                         #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
277                         continue
278                     
279                     new_count += 1
280                     
281                     #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
282                     
283                     tracker.add(share)
284                 
285                 if shares and peer is not None:
286                     peer_heads.setdefault(shares[0].hash, set()).add(peer)
287                 
288                 if new_count:
289                     set_best_share()
290                 
291                 if len(shares) > 5:
292                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.items), 2*net.CHAIN_LENGTH)
293             
294             def handle_share_hashes(self, hashes, peer):
295                 t = time.time()
296                 get_hashes = []
297                 for share_hash in hashes:
298                     if share_hash in tracker.items:
299                         continue
300                     last_request_time, count = requested.get(share_hash, (None, 0))
301                     if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
302                         continue
303                     print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
304                     get_hashes.append(share_hash)
305                     requested[share_hash] = t, count + 1
306                 
307                 if hashes and peer is not None:
308                     peer_heads.setdefault(hashes[0], set()).add(peer)
309                 if get_hashes:
310                     peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
311             
312             def handle_get_shares(self, hashes, parents, stops, peer):
313                 parents = min(parents, 1000//len(hashes))
314                 stops = set(stops)
315                 shares = []
316                 for share_hash in hashes:
317                     for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
318                         if share.hash in stops:
319                             break
320                         shares.append(share)
321                 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
322                 return shares
323             
324             def handle_bestblock(self, header, peer):
325                 if net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header)) > header['bits'].target:
326                     raise p2p.PeerMisbehavingError('received block header fails PoW test')
327                 handle_header(header)
328         
329         @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
330         def submit_block_p2p(block):
331             if factory.conn.value is None:
332                 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%32x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, bitcoin_data.hash256(bitcoin_data.block_header_type.pack(block['header'])))
333                 raise deferral.RetrySilentlyException()
334             factory.conn.value.send_block(block=block)
335         
336         @deferral.retry('Error submitting block: (will retry)', 10, 10)
337         @defer.inlineCallbacks
338         def submit_block_rpc(block, ignore_failure):
339             if bitcoind_work.value['use_getblocktemplate']:
340                 success = yield bitcoind.rpc_getblocktemplate(dict(data=bitcoin_data.block_type.pack(block).encode('hex')))
341             else:
342                 success = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
343             success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
344             if (not success and success_expected and not ignore_failure) or (success and not success_expected):
345                 print >>sys.stderr, 'Block submittal result: %s Expected: %s' % (success, success_expected)
346         
347         def submit_block(block, ignore_failure):
348             submit_block_p2p(block)
349             submit_block_rpc(block, ignore_failure)
350         
351         @tracker.verified.added.watch
352         def _(share):
353             if share.pow_hash <= share.header['bits'].target:
354                 submit_block(share.as_block(tracker), ignore_failure=True)
355                 print
356                 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
357                 print
358                 def spread():
359                     if (get_height_rel_highest(share.header['previous_block']) > -5 or
360                         bitcoind_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]):
361                         broadcast_share(share.hash)
362                 spread()
363                 reactor.callLater(5, spread) # so get_height_rel_highest can update
364         
365         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
366         
367         @defer.inlineCallbacks
368         def parse(x):
369             if ':' in x:
370                 ip, port = x.split(':')
371                 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
372             else:
373                 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
374         
375         addrs = {}
376         if os.path.exists(os.path.join(datadir_path, 'addrs')):
377             try:
378                 with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
379                     addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
380             except:
381                 print >>sys.stderr, 'error parsing addrs'
382         for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
383             try:
384                 addr = yield addr_df
385                 if addr not in addrs:
386                     addrs[addr] = (0, time.time(), time.time())
387             except:
388                 log.err()
389         
390         connect_addrs = set()
391         for addr_df in map(parse, args.p2pool_nodes):
392             try:
393                 connect_addrs.add((yield addr_df))
394             except:
395                 log.err()
396         
397         p2p_node = Node(
398             best_share_hash_func=lambda: best_share_var.value,
399             port=args.p2pool_port,
400             net=net,
401             addr_store=addrs,
402             connect_addrs=connect_addrs,
403             max_incoming_conns=args.p2pool_conns,
404         )
405         p2p_node.start()
406         
407         def save_addrs():
408             with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
409                 f.write(json.dumps(p2p_node.addr_store.items()))
410         task.LoopingCall(save_addrs).start(60)
411         
412         @best_block_header.changed.watch
413         def _(header):
414             for peer in p2p_node.peers.itervalues():
415                 peer.send_bestblock(header=header)
416         
417         @defer.inlineCallbacks
418         def broadcast_share(share_hash):
419             shares = []
420             for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
421                 if share.hash in shared_share_hashes:
422                     break
423                 shared_share_hashes.add(share.hash)
424                 shares.append(share)
425             
426             for peer in list(p2p_node.peers.itervalues()):
427                 yield peer.sendShares([share for share in shares if share.peer is not peer])
428         
429         # send share when the chain changes to their chain
430         best_share_var.changed.watch(broadcast_share)
431         
432         def save_shares():
433             for share in tracker.get_chain(best_share_var.value, min(tracker.get_height(best_share_var.value), 2*net.CHAIN_LENGTH)):
434                 ss.add_share(share)
435                 if share.hash in tracker.verified.items:
436                     ss.add_verified_hash(share.hash)
437         task.LoopingCall(save_shares).start(60)
438         
439         print '    ...success!'
440         print
441         
442         if args.upnp:
443             @defer.inlineCallbacks
444             def upnp_thread():
445                 while True:
446                     try:
447                         is_lan, lan_ip = yield ipdiscover.get_local_ip()
448                         if is_lan:
449                             pm = yield portmapper.get_port_mapper()
450                             yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
451                     except defer.TimeoutError:
452                         pass
453                     except:
454                         if p2pool.DEBUG:
455                             log.err(None, 'UPnP error:')
456                     yield deferral.sleep(random.expovariate(1/120))
457             upnp_thread()
458         
459         # start listening for workers with a JSON-RPC server
460         
461         print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
462         
463         get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, best_share_var.value, bitcoind_work.value['bits'].target, bitcoind_work.value['subsidy'], net)
464         
465         wb = work.WorkerBridge(my_pubkey_hash, net, args.donation_percentage, bitcoind_work, best_block_header, merged_urls, best_share_var, tracker, my_share_hashes, my_doa_share_hashes, args.worker_fee, p2p_node, submit_block, set_best_share, broadcast_share, block_height_var)
466         web_root = web.get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net, wb.get_stale_counts, my_pubkey_hash, wb.local_rate_monitor, args.worker_fee, p2p_node, wb.my_share_hashes, wb.pseudoshare_received, wb.share_received, best_share_var, bitcoind_warning_var)
467         worker_interface.WorkerInterface(wb).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
468         
469         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
470         
471         with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
472             pass
473         
474         print '    ...success!'
475         print
476         
477         
478         # done!
479         print 'Started successfully!'
480         print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
481         if args.donation_percentage > 0.51:
482             print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
483         elif args.donation_percentage < 0.49:
484             print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
485         else:
486             print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
487             print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
488         print
489         
490         
491         if hasattr(signal, 'SIGALRM'):
492             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
493                 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
494             ))
495             signal.siginterrupt(signal.SIGALRM, False)
496             task.LoopingCall(signal.alarm, 30).start(1)
497         
498         if args.irc_announce:
499             from twisted.words.protocols import irc
500             class IRCClient(irc.IRCClient):
501                 nickname = 'p2pool%02i' % (random.randrange(100),)
502                 channel = net.ANNOUNCE_CHANNEL
503                 def lineReceived(self, line):
504                     if p2pool.DEBUG:
505                         print repr(line)
506                     irc.IRCClient.lineReceived(self, line)
507                 def signedOn(self):
508                     irc.IRCClient.signedOn(self)
509                     self.factory.resetDelay()
510                     self.join(self.channel)
511                     @defer.inlineCallbacks
512                     def new_share(share):
513                         if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
514                             yield deferral.sleep(random.expovariate(1/60))
515                             message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
516                             if all('%x' % (share.header_hash,) not in old_message for old_message in self.recent_messages):
517                                 self.say(self.channel, message)
518                                 self._remember_message(message)
519                     self.watch_id = tracker.verified.added.watch(new_share)
520                     self.recent_messages = []
521                 def _remember_message(self, message):
522                     self.recent_messages.append(message)
523                     while len(self.recent_messages) > 100:
524                         self.recent_messages.pop(0)
525                 def privmsg(self, user, channel, message):
526                     if channel == self.channel:
527                         self._remember_message(message)
528                 def connectionLost(self, reason):
529                     tracker.verified.added.unwatch(self.watch_id)
530                     print 'IRC connection lost:', reason.getErrorMessage()
531             class IRCClientFactory(protocol.ReconnectingClientFactory):
532                 protocol = IRCClient
533             reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
534         
535         @defer.inlineCallbacks
536         def status_thread():
537             last_str = None
538             last_time = 0
539             while True:
540                 yield deferral.sleep(3)
541                 try:
542                     height = tracker.get_height(best_share_var.value)
543                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
544                         height,
545                         len(tracker.verified.items),
546                         len(tracker.items),
547                         len(p2p_node.peers),
548                         sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
549                     ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
550                     
551                     datums, dt = wb.local_rate_monitor.get_datums_in_last()
552                     my_att_s = sum(datum['work']/dt for datum in datums)
553                     this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
554                         math.format(int(my_att_s)),
555                         math.format_dt(dt),
556                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
557                         math.format_dt(2**256 / tracker.items[best_share_var.value].max_target / my_att_s) if my_att_s and best_share_var.value else '???',
558                     )
559                     
560                     if height > 2:
561                         (stale_orphan_shares, stale_doa_shares), shares, _ = wb.get_stale_counts()
562                         stale_prop = p2pool_data.get_average_stale_prop(tracker, best_share_var.value, min(60*60//net.SHARE_PERIOD, height))
563                         real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, best_share_var.value, min(height - 1, 60*60//net.SHARE_PERIOD)) / (1 - stale_prop)
564                         
565                         this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
566                             shares, stale_orphan_shares, stale_doa_shares,
567                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
568                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
569                             get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
570                         )
571                         this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
572                             math.format(int(real_att_s)),
573                             100*stale_prop,
574                             math.format_dt(2**256 / bitcoind_work.value['bits'].target / real_att_s),
575                         )
576                         
577                         for warning in p2pool_data.get_warnings(tracker, best_share_var.value, net, bitcoind_warning_var.value, bitcoind_work.value):
578                             print >>sys.stderr, '#'*40
579                             print >>sys.stderr, '>>> Warning: ' + warning
580                             print >>sys.stderr, '#'*40
581                     
582                     if this_str != last_str or time.time() > last_time + 15:
583                         print this_str
584                         last_str = this_str
585                         last_time = time.time()
586                 except:
587                     log.err()
588         status_thread()
589     except:
590         reactor.stop()
591         log.err(None, 'Fatal error:')
592
593 def run():
594     realnets = dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
595     
596     parser = fixargparse.FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
597     parser.add_argument('--version', action='version', version=p2pool.__version__)
598     parser.add_argument('--net',
599         help='use specified network (default: bitcoin)',
600         action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
601     parser.add_argument('--testnet',
602         help='''use the network's testnet''',
603         action='store_const', const=True, default=False, dest='testnet')
604     parser.add_argument('--debug',
605         help='enable debugging mode',
606         action='store_const', const=True, default=False, dest='debug')
607     parser.add_argument('-a', '--address',
608         help='generate payouts to this address (default: <address requested from bitcoind>)',
609         type=str, action='store', default=None, dest='address')
610     parser.add_argument('--datadir',
611         help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
612         type=str, action='store', default=None, dest='datadir')
613     parser.add_argument('--logfile',
614         help='''log to this file (default: data/<NET>/log)''',
615         type=str, action='store', default=None, dest='logfile')
616     parser.add_argument('--merged',
617         help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
618         type=str, action='append', default=[], dest='merged_urls')
619     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
620         help='donate this percentage of work towards the development of p2pool (default: 0.5)',
621         type=float, action='store', default=0.5, dest='donation_percentage')
622     parser.add_argument('--iocp',
623         help='use Windows IOCP API in order to avoid errors due to large number of sockets being open',
624         action='store_true', default=False, dest='iocp')
625     parser.add_argument('--irc-announce',
626         help='announce any blocks found on irc://irc.freenode.net/#p2pool',
627         action='store_true', default=False, dest='irc_announce')
628     parser.add_argument('--no-bugreport',
629         help='disable submitting caught exceptions to the author',
630         action='store_true', default=False, dest='no_bugreport')
631     
632     p2pool_group = parser.add_argument_group('p2pool interface')
633     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
634         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
635         type=int, action='store', default=None, dest='p2pool_port')
636     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
637         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
638         type=str, action='append', default=[], dest='p2pool_nodes')
639     parser.add_argument('--disable-upnp',
640         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
641         action='store_false', default=True, dest='upnp')
642     p2pool_group.add_argument('--max-conns', metavar='CONNS',
643         help='maximum incoming connections (default: 40)',
644         type=int, action='store', default=40, dest='p2pool_conns')
645     
646     worker_group = parser.add_argument_group('worker interface')
647     worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
648         help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
649         type=str, action='store', default=None, dest='worker_endpoint')
650     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
651         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
652         type=float, action='store', default=0, dest='worker_fee')
653     
654     bitcoind_group = parser.add_argument_group('bitcoind interface')
655     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
656         help='connect to this address (default: 127.0.0.1)',
657         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
658     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
659         help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
660         type=int, action='store', default=None, dest='bitcoind_rpc_port')
661     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
662         help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
663         type=int, action='store', default=None, dest='bitcoind_p2p_port')
664     
665     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
666         help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
667         type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
668     
669     args = parser.parse_args()
670     
671     if args.debug:
672         p2pool.DEBUG = True
673         defer.setDebugging(True)
674     
675     net_name = args.net_name + ('_testnet' if args.testnet else '')
676     net = networks.nets[net_name]
677     
678     datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
679     if not os.path.exists(datadir_path):
680         os.makedirs(datadir_path)
681     
682     if len(args.bitcoind_rpc_userpass) > 2:
683         parser.error('a maximum of two arguments are allowed')
684     args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
685     
686     if args.bitcoind_rpc_password is None:
687         conf_path = net.PARENT.CONF_FILE_FUNC()
688         if not os.path.exists(conf_path):
689             parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
690                 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
691                 '''\r\n'''
692                 '''server=1\r\n'''
693                 '''rpcpassword=%x\r\n'''
694                 '''\r\n'''
695                 '''Keep that password secret! After creating the file, restart Bitcoin.''' % (conf_path, random.randrange(2**128)))
696         with open(conf_path, 'rb') as f:
697             cp = ConfigParser.RawConfigParser()
698             cp.readfp(StringIO.StringIO('[x]\r\n' + f.read()))
699             for conf_name, var_name, var_type in [
700                 ('rpcuser', 'bitcoind_rpc_username', str),
701                 ('rpcpassword', 'bitcoind_rpc_password', str),
702                 ('rpcport', 'bitcoind_rpc_port', int),
703                 ('port', 'bitcoind_p2p_port', int),
704             ]:
705                 if getattr(args, var_name) is None and cp.has_option('x', conf_name):
706                     setattr(args, var_name, var_type(cp.get('x', conf_name)))
707         if args.bitcoind_rpc_password is None:
708             parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
709     
710     if args.bitcoind_rpc_username is None:
711         args.bitcoind_rpc_username = ''
712     
713     if args.bitcoind_rpc_port is None:
714         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
715     
716     if args.bitcoind_p2p_port is None:
717         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
718     
719     if args.p2pool_port is None:
720         args.p2pool_port = net.P2P_PORT
721     
722     if args.worker_endpoint is None:
723         worker_endpoint = '', net.WORKER_PORT
724     elif ':' not in args.worker_endpoint:
725         worker_endpoint = '', int(args.worker_endpoint)
726     else:
727         addr, port = args.worker_endpoint.rsplit(':', 1)
728         worker_endpoint = addr, int(port)
729     
730     if args.address is not None:
731         try:
732             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
733         except Exception, e:
734             parser.error('error parsing address: ' + repr(e))
735     else:
736         args.pubkey_hash = None
737     
738     def separate_url(url):
739         s = urlparse.urlsplit(url)
740         if '@' not in s.netloc:
741             parser.error('merged url netloc must contain an "@"')
742         userpass, new_netloc = s.netloc.rsplit('@', 1)
743         return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
744     merged_urls = map(separate_url, args.merged_urls)
745     
746     if args.logfile is None:
747         args.logfile = os.path.join(datadir_path, 'log')
748     
749     logfile = logging.LogFile(args.logfile)
750     pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
751     sys.stdout = logging.AbortPipe(pipe)
752     sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
753     if hasattr(signal, "SIGUSR1"):
754         def sigusr1(signum, frame):
755             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
756             logfile.reopen()
757             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
758         signal.signal(signal.SIGUSR1, sigusr1)
759     task.LoopingCall(logfile.reopen).start(5)
760     
761     class ErrorReporter(object):
762         def __init__(self):
763             self.last_sent = None
764         
765         def emit(self, eventDict):
766             if not eventDict["isError"]:
767                 return
768             
769             if self.last_sent is not None and time.time() < self.last_sent + 5:
770                 return
771             self.last_sent = time.time()
772             
773             if 'failure' in eventDict:
774                 text = ((eventDict.get('why') or 'Unhandled Error')
775                     + '\n' + eventDict['failure'].getTraceback())
776             else:
777                 text = " ".join([str(m) for m in eventDict["message"]]) + "\n"
778             
779             from twisted.web import client
780             client.getPage(
781                 url='http://u.forre.st/p2pool_error.cgi',
782                 method='POST',
783                 postdata=p2pool.__version__ + ' ' + net.NAME + '\n' + text,
784                 timeout=15,
785             ).addBoth(lambda x: None)
786     if not args.no_bugreport:
787         log.addObserver(ErrorReporter().emit)
788     
789     reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)
790     reactor.run()