cache txs between peers
[p2pool.git] / p2pool / main.py
1 from __future__ import division
2
3 import base64
4 import json
5 import os
6 import random
7 import sys
8 import time
9 import signal
10 import traceback
11 import urlparse
12
13 if '--iocp' in sys.argv:
14     from twisted.internet import iocpreactor
15     iocpreactor.install()
16 from twisted.internet import defer, reactor, protocol, task
17 from twisted.web import server
18 from twisted.python import log
19 from nattraverso import portmapper, ipdiscover
20
21 import bitcoin.p2p as bitcoin_p2p, bitcoin.data as bitcoin_data
22 from bitcoin import worker_interface, height_tracker
23 from util import fixargparse, jsonrpc, variable, deferral, math, logging
24 from . import p2p, networks, web, work
25 import p2pool, p2pool.data as p2pool_data
26
27 @deferral.retry('Error getting work from bitcoind:', 3)
28 @defer.inlineCallbacks
29 def getwork(bitcoind, use_getblocktemplate=False):
30     def go():
31         if use_getblocktemplate:
32             return bitcoind.rpc_getblocktemplate(dict(mode='template'))
33         else:
34             return bitcoind.rpc_getmemorypool()
35     try:
36         work = yield go()
37     except jsonrpc.Error_for_code(-32601): # Method not found
38         use_getblocktemplate = not use_getblocktemplate
39         try:
40             work = yield go()
41         except jsonrpc.Error_for_code(-32601): # Method not found
42             print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
43             raise deferral.RetrySilentlyException()
44     packed_transactions = [(x['data'] if isinstance(x, dict) else x).decode('hex') for x in work['transactions']]
45     if 'height' not in work:
46         work['height'] = (yield bitcoind.rpc_getblock(work['previousblockhash']))['height'] + 1
47     elif p2pool.DEBUG:
48         assert work['height'] == (yield bitcoind.rpc_getblock(work['previousblockhash']))['height'] + 1
49     defer.returnValue(dict(
50         version=work['version'],
51         previous_block=int(work['previousblockhash'], 16),
52         transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
53         subsidy=work['coinbasevalue'],
54         time=work['time'] if 'time' in work else work['curtime'],
55         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
56         coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
57         height=work['height'],
58         last_update=time.time(),
59         use_getblocktemplate=use_getblocktemplate,
60     ))
61
62 @defer.inlineCallbacks
63 def main(args, net, datadir_path, merged_urls, worker_endpoint):
64     try:
65         print 'p2pool (version %s)' % (p2pool.__version__,)
66         print
67         
68         traffic_happened = variable.Event()
69         
70         @defer.inlineCallbacks
71         def connect_p2p():
72             # connect to bitcoind over bitcoin-p2p
73             print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
74             factory = bitcoin_p2p.ClientFactory(net.PARENT)
75             reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
76             yield factory.getProtocol() # waits until handshake is successful
77             print '    ...success!'
78             print
79             defer.returnValue(factory)
80         
81         if args.testnet: # establish p2p connection first if testnet so bitcoind can work without connections
82             factory = yield connect_p2p()
83         
84         # connect to bitcoind over JSON-RPC and do initial getmemorypool
85         url = '%s://%s:%i/' % ('https' if args.bitcoind_rpc_ssl else 'http', args.bitcoind_address, args.bitcoind_rpc_port)
86         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
87         bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
88         @deferral.retry('Error while checking Bitcoin connection:', 1)
89         @defer.inlineCallbacks
90         def check():
91             if not (yield net.PARENT.RPC_CHECK(bitcoind)):
92                 print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
93                 raise deferral.RetrySilentlyException()
94             if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version']):
95                 print >>sys.stderr, '    Bitcoin version too old! Upgrade to 0.6.4 or newer!'
96                 raise deferral.RetrySilentlyException()
97         yield check()
98         temp_work = yield getwork(bitcoind)
99         
100         if not args.testnet:
101             factory = yield connect_p2p()
102         
103         block_height_var = variable.Variable(None)
104         @defer.inlineCallbacks
105         def poll_height():
106             block_height_var.set((yield deferral.retry('Error while calling getblockcount:')(bitcoind.rpc_getblockcount)()))
107         yield poll_height()
108         task.LoopingCall(poll_height).start(60*60)
109         
110         bitcoind_warning_var = variable.Variable(None)
111         @defer.inlineCallbacks
112         def poll_warnings():
113             errors = (yield deferral.retry('Error while calling getmininginfo:')(bitcoind.rpc_getmininginfo)())['errors']
114             bitcoind_warning_var.set(errors if errors != '' else None)
115         yield poll_warnings()
116         task.LoopingCall(poll_warnings).start(20*60)
117         
118         print '    ...success!'
119         print '    Current block hash: %x' % (temp_work['previous_block'],)
120         print '    Current block height: %i' % (block_height_var.value,)
121         print
122         
123         print 'Determining payout address...'
124         if args.pubkey_hash is None:
125             address_path = os.path.join(datadir_path, 'cached_payout_address')
126             
127             if os.path.exists(address_path):
128                 with open(address_path, 'rb') as f:
129                     address = f.read().strip('\r\n')
130                 print '    Loaded cached address: %s...' % (address,)
131             else:
132                 address = None
133             
134             if address is not None:
135                 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
136                 if not res['isvalid'] or not res['ismine']:
137                     print '    Cached address is either invalid or not controlled by local bitcoind!'
138                     address = None
139             
140             if address is None:
141                 print '    Getting payout address from bitcoind...'
142                 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
143             
144             with open(address_path, 'wb') as f:
145                 f.write(address)
146             
147             my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
148         else:
149             my_pubkey_hash = args.pubkey_hash
150         print '    ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
151         print
152         
153         my_share_hashes = set()
154         my_doa_share_hashes = set()
155         
156         tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
157         shared_share_hashes = set()
158         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
159         known_verified = set()
160         print "Loading shares..."
161         for i, (mode, contents) in enumerate(ss.get_shares()):
162             if mode == 'share':
163                 if contents.hash in tracker.items:
164                     continue
165                 shared_share_hashes.add(contents.hash)
166                 contents.time_seen = 0
167                 tracker.add(contents)
168                 if len(tracker.items) % 1000 == 0 and tracker.items:
169                     print "    %i" % (len(tracker.items),)
170             elif mode == 'verified_hash':
171                 known_verified.add(contents)
172             else:
173                 raise AssertionError()
174         print "    ...inserting %i verified shares..." % (len(known_verified),)
175         for h in known_verified:
176             if h not in tracker.items:
177                 ss.forget_verified_share(h)
178                 continue
179             tracker.verified.add(tracker.items[h])
180         print "    ...done loading %i shares!" % (len(tracker.items),)
181         print
182         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
183         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
184         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
185         
186         print 'Initializing work...'
187         
188         
189         # BITCOIND WORK
190         
191         bitcoind_work = variable.Variable((yield getwork(bitcoind)))
192         @defer.inlineCallbacks
193         def work_poller():
194             while True:
195                 flag = factory.new_block.get_deferred()
196                 try:
197                     bitcoind_work.set((yield getwork(bitcoind, bitcoind_work.value['use_getblocktemplate'])))
198                 except:
199                     log.err()
200                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
201         work_poller()
202         
203         # PEER WORK
204         
205         best_block_header = variable.Variable(None)
206         def handle_header(new_header):
207             # check that header matches current target
208             if not (net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(new_header)) <= bitcoind_work.value['bits'].target):
209                 return
210             bitcoind_best_block = bitcoind_work.value['previous_block']
211             if (best_block_header.value is None
212                 or (
213                     new_header['previous_block'] == bitcoind_best_block and
214                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)) == bitcoind_best_block
215                 ) # new is child of current and previous is current
216                 or (
217                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(new_header)) == bitcoind_best_block and
218                     best_block_header.value['previous_block'] != bitcoind_best_block
219                 )): # new is current and previous is not a child of current
220                 best_block_header.set(new_header)
221         @defer.inlineCallbacks
222         def poll_header():
223             handle_header((yield factory.conn.value.get_block_header(bitcoind_work.value['previous_block'])))
224         bitcoind_work.changed.watch(lambda _: poll_header())
225         yield deferral.retry('Error while requesting best block header:')(poll_header)()
226         
227         # BEST SHARE
228         
229         get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: bitcoind_work.value['previous_block'], net)
230         
231         best_share_var = variable.Variable(None)
232         desired_var = variable.Variable(None)
233         def set_best_share():
234             best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'])
235             
236             best_share_var.set(best)
237             desired_var.set(desired)
238         bitcoind_work.changed.watch(lambda _: set_best_share())
239         set_best_share()
240         
241         print '    ...success!'
242         print
243         
244         # setup p2p logic and join p2pool network
245         
246         known_txs_var = variable.Variable({}) # hash -> tx
247         mining_txs_var = variable.Variable({}) # hash -> tx
248         # update mining_txs according to getwork results
249         @bitcoind_work.changed.watch
250         def _(work):
251             new_mining_txs = {}
252             for tx in work['transactions']:
253                 new_mining_txs[bitcoin_data.hash256(bitcoin_data.tx_type.pack(tx))] = tx
254             mining_txs_var.set(new_mining_txs)
255         # forward transactions seen to bitcoind
256         @known_txs_var.transitioned.watch
257         def _(before, after):
258             for tx_hash in set(after) - set(before):
259                 factory.conn.value.send_tx(tx=after[tx_hash])
260         
261         class Node(p2p.Node):
262             def handle_shares(self, shares, peer):
263                 if len(shares) > 5:
264                     print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
265                 
266                 new_count = 0
267                 for share in shares:
268                     if share.hash in tracker.items:
269                         #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
270                         continue
271                     
272                     new_count += 1
273                     
274                     #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
275                     
276                     tracker.add(share)
277                 
278                 if new_count:
279                     set_best_share()
280                 
281                 if len(shares) > 5:
282                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.items), 2*net.CHAIN_LENGTH)
283             
284             @defer.inlineCallbacks
285             def handle_share_hashes(self, hashes, peer):
286                 new_hashes = [x for x in hashes if x not in tracker.items]
287                 if not new_hashes:
288                     return
289                 try:
290                     shares = yield peer.get_shares(
291                         hashes=new_hashes,
292                         parents=0,
293                         stops=[],
294                     )
295                 except:
296                     log.err(None, 'in handle_share_hashes:')
297                 else:
298                     self.handle_shares(shares, peer)
299             
300             def handle_get_shares(self, hashes, parents, stops, peer):
301                 parents = min(parents, 1000//len(hashes))
302                 stops = set(stops)
303                 shares = []
304                 for share_hash in hashes:
305                     for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
306                         if share.hash in stops:
307                             break
308                         shares.append(share)
309                 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
310                 return shares
311             
312             def handle_bestblock(self, header, peer):
313                 if net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header)) > header['bits'].target:
314                     raise p2p.PeerMisbehavingError('received block header fails PoW test')
315                 handle_header(header)
316         
317         @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
318         def submit_block_p2p(block):
319             if factory.conn.value is None:
320                 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, bitcoin_data.hash256(bitcoin_data.block_header_type.pack(block['header'])))
321                 raise deferral.RetrySilentlyException()
322             factory.conn.value.send_block(block=block)
323         
324         @deferral.retry('Error submitting block: (will retry)', 10, 10)
325         @defer.inlineCallbacks
326         def submit_block_rpc(block, ignore_failure):
327             if bitcoind_work.value['use_getblocktemplate']:
328                 result = yield bitcoind.rpc_submitblock(bitcoin_data.block_type.pack(block).encode('hex'))
329                 success = result is None
330             else:
331                 result = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
332                 success = result
333             success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
334             if (not success and success_expected and not ignore_failure) or (success and not success_expected):
335                 print >>sys.stderr, 'Block submittal result: %s (%r) Expected: %s' % (success, result, success_expected)
336         
337         def submit_block(block, ignore_failure):
338             submit_block_p2p(block)
339             submit_block_rpc(block, ignore_failure)
340         
341         @tracker.verified.added.watch
342         def _(share):
343             if share.pow_hash <= share.header['bits'].target:
344                 submit_block(share.as_block(tracker), ignore_failure=True)
345                 print
346                 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
347                 print
348                 def spread():
349                     if (get_height_rel_highest(share.header['previous_block']) > -5 or
350                         bitcoind_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]):
351                         broadcast_share(share.hash)
352                 spread()
353                 reactor.callLater(5, spread) # so get_height_rel_highest can update
354         
355         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
356         
357         @defer.inlineCallbacks
358         def parse(x):
359             if ':' in x:
360                 ip, port = x.split(':')
361                 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
362             else:
363                 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
364         
365         addrs = {}
366         if os.path.exists(os.path.join(datadir_path, 'addrs')):
367             try:
368                 with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
369                     addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
370             except:
371                 print >>sys.stderr, 'error parsing addrs'
372         for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
373             try:
374                 addr = yield addr_df
375                 if addr not in addrs:
376                     addrs[addr] = (0, time.time(), time.time())
377             except:
378                 log.err()
379         
380         connect_addrs = set()
381         for addr_df in map(parse, args.p2pool_nodes):
382             try:
383                 connect_addrs.add((yield addr_df))
384             except:
385                 log.err()
386         
387         p2p_node = Node(
388             best_share_hash_func=lambda: best_share_var.value,
389             port=args.p2pool_port,
390             net=net,
391             addr_store=addrs,
392             connect_addrs=connect_addrs,
393             max_incoming_conns=args.p2pool_conns,
394             traffic_happened=traffic_happened,
395             known_txs_var=known_txs_var,
396             mining_txs_var=mining_txs_var,
397         )
398         p2p_node.start()
399         
400         def save_addrs():
401             with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
402                 f.write(json.dumps(p2p_node.addr_store.items()))
403         task.LoopingCall(save_addrs).start(60)
404         
405         @best_block_header.changed.watch
406         def _(header):
407             for peer in p2p_node.peers.itervalues():
408                 peer.send_bestblock(header=header)
409         
410         @defer.inlineCallbacks
411         def broadcast_share(share_hash):
412             shares = []
413             for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
414                 if share.hash in shared_share_hashes:
415                     break
416                 shared_share_hashes.add(share.hash)
417                 shares.append(share)
418             
419             for peer in list(p2p_node.peers.itervalues()):
420                 yield peer.sendShares([share for share in shares if share.peer is not peer])
421         
422         # send share when the chain changes to their chain
423         best_share_var.changed.watch(broadcast_share)
424         
425         def save_shares():
426             for share in tracker.get_chain(best_share_var.value, min(tracker.get_height(best_share_var.value), 2*net.CHAIN_LENGTH)):
427                 ss.add_share(share)
428                 if share.hash in tracker.verified.items:
429                     ss.add_verified_hash(share.hash)
430         task.LoopingCall(save_shares).start(60)
431         
432         @apply
433         @defer.inlineCallbacks
434         def download_shares():
435             while True:
436                 desired = yield desired_var.get_when_satisfies(lambda val: len(val) != 0)
437                 peer2, share_hash = random.choice(desired)
438                 
439                 if len(p2p_node.peers) == 0:
440                     yield deferral.sleep(1)
441                     continue
442                 peer = random.choice(p2p_node.peers.values())
443                 
444                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
445                 try:
446                     shares = yield peer.get_shares(
447                         hashes=[share_hash],
448                         parents=500,
449                         stops=[],
450                     )
451                 except:
452                     log.err(None, 'in download_shares:')
453                     continue
454                 
455                 if not shares:
456                     yield deferral.sleep(1) # sleep so we don't keep rerequesting the same share nobody has
457                     continue
458                 p2p_node.handle_shares(shares, peer)
459         
460         print '    ...success!'
461         print
462         
463         if args.upnp:
464             @defer.inlineCallbacks
465             def upnp_thread():
466                 while True:
467                     try:
468                         is_lan, lan_ip = yield ipdiscover.get_local_ip()
469                         if is_lan:
470                             pm = yield portmapper.get_port_mapper()
471                             yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
472                     except defer.TimeoutError:
473                         pass
474                     except:
475                         if p2pool.DEBUG:
476                             log.err(None, 'UPnP error:')
477                     yield deferral.sleep(random.expovariate(1/120))
478             upnp_thread()
479         
480         # start listening for workers with a JSON-RPC server
481         
482         print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
483         
484         get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, best_share_var.value, bitcoind_work.value['bits'].target, bitcoind_work.value['subsidy'], net)
485         
486         wb = work.WorkerBridge(my_pubkey_hash, net, args.donation_percentage, bitcoind_work, best_block_header, merged_urls, best_share_var, tracker, my_share_hashes, my_doa_share_hashes, args.worker_fee, p2p_node, submit_block, set_best_share, broadcast_share, block_height_var)
487         web_root = web.get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net, wb.get_stale_counts, my_pubkey_hash, wb.local_rate_monitor, args.worker_fee, p2p_node, wb.my_share_hashes, wb.pseudoshare_received, wb.share_received, best_share_var, bitcoind_warning_var, traffic_happened)
488         worker_interface.WorkerInterface(wb).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
489         
490         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
491         
492         with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
493             pass
494         
495         print '    ...success!'
496         print
497         
498         
499         # done!
500         print 'Started successfully!'
501         print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
502         if args.donation_percentage > 0.51:
503             print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
504         elif args.donation_percentage < 0.49:
505             print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
506         else:
507             print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
508             print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
509         print
510         
511         
512         if hasattr(signal, 'SIGALRM'):
513             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
514                 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
515             ))
516             signal.siginterrupt(signal.SIGALRM, False)
517             task.LoopingCall(signal.alarm, 30).start(1)
518         
519         if args.irc_announce:
520             from twisted.words.protocols import irc
521             class IRCClient(irc.IRCClient):
522                 nickname = 'p2pool%02i' % (random.randrange(100),)
523                 channel = net.ANNOUNCE_CHANNEL
524                 def lineReceived(self, line):
525                     if p2pool.DEBUG:
526                         print repr(line)
527                     irc.IRCClient.lineReceived(self, line)
528                 def signedOn(self):
529                     irc.IRCClient.signedOn(self)
530                     self.factory.resetDelay()
531                     self.join(self.channel)
532                     @defer.inlineCallbacks
533                     def new_share(share):
534                         if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
535                             yield deferral.sleep(random.expovariate(1/60))
536                             message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
537                             if all('%x' % (share.header_hash,) not in old_message for old_message in self.recent_messages):
538                                 self.say(self.channel, message)
539                                 self._remember_message(message)
540                     self.watch_id = tracker.verified.added.watch(new_share)
541                     self.recent_messages = []
542                 def _remember_message(self, message):
543                     self.recent_messages.append(message)
544                     while len(self.recent_messages) > 100:
545                         self.recent_messages.pop(0)
546                 def privmsg(self, user, channel, message):
547                     if channel == self.channel:
548                         self._remember_message(message)
549                 def connectionLost(self, reason):
550                     tracker.verified.added.unwatch(self.watch_id)
551                     print 'IRC connection lost:', reason.getErrorMessage()
552             class IRCClientFactory(protocol.ReconnectingClientFactory):
553                 protocol = IRCClient
554             reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
555         
556         @defer.inlineCallbacks
557         def status_thread():
558             last_str = None
559             last_time = 0
560             while True:
561                 yield deferral.sleep(3)
562                 try:
563                     height = tracker.get_height(best_share_var.value)
564                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
565                         height,
566                         len(tracker.verified.items),
567                         len(tracker.items),
568                         len(p2p_node.peers),
569                         sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
570                     ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
571                     
572                     datums, dt = wb.local_rate_monitor.get_datums_in_last()
573                     my_att_s = sum(datum['work']/dt for datum in datums)
574                     this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
575                         math.format(int(my_att_s)),
576                         math.format_dt(dt),
577                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
578                         math.format_dt(2**256 / tracker.items[best_share_var.value].max_target / my_att_s) if my_att_s and best_share_var.value else '???',
579                     )
580                     
581                     if height > 2:
582                         (stale_orphan_shares, stale_doa_shares), shares, _ = wb.get_stale_counts()
583                         stale_prop = p2pool_data.get_average_stale_prop(tracker, best_share_var.value, min(60*60//net.SHARE_PERIOD, height))
584                         real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, best_share_var.value, min(height - 1, 60*60//net.SHARE_PERIOD)) / (1 - stale_prop)
585                         
586                         this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
587                             shares, stale_orphan_shares, stale_doa_shares,
588                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
589                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
590                             get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
591                         )
592                         this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
593                             math.format(int(real_att_s)),
594                             100*stale_prop,
595                             math.format_dt(2**256 / bitcoind_work.value['bits'].target / real_att_s),
596                         )
597                         
598                         for warning in p2pool_data.get_warnings(tracker, best_share_var.value, net, bitcoind_warning_var.value, bitcoind_work.value):
599                             print >>sys.stderr, '#'*40
600                             print >>sys.stderr, '>>> Warning: ' + warning
601                             print >>sys.stderr, '#'*40
602                     
603                     if this_str != last_str or time.time() > last_time + 15:
604                         print this_str
605                         last_str = this_str
606                         last_time = time.time()
607                 except:
608                     log.err()
609         status_thread()
610     except:
611         reactor.stop()
612         log.err(None, 'Fatal error:')
613
614 def run():
615     realnets = dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
616     
617     parser = fixargparse.FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
618     parser.add_argument('--version', action='version', version=p2pool.__version__)
619     parser.add_argument('--net',
620         help='use specified network (default: bitcoin)',
621         action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
622     parser.add_argument('--testnet',
623         help='''use the network's testnet''',
624         action='store_const', const=True, default=False, dest='testnet')
625     parser.add_argument('--debug',
626         help='enable debugging mode',
627         action='store_const', const=True, default=False, dest='debug')
628     parser.add_argument('-a', '--address',
629         help='generate payouts to this address (default: <address requested from bitcoind>)',
630         type=str, action='store', default=None, dest='address')
631     parser.add_argument('--datadir',
632         help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
633         type=str, action='store', default=None, dest='datadir')
634     parser.add_argument('--logfile',
635         help='''log to this file (default: data/<NET>/log)''',
636         type=str, action='store', default=None, dest='logfile')
637     parser.add_argument('--merged',
638         help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
639         type=str, action='append', default=[], dest='merged_urls')
640     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
641         help='donate this percentage of work towards the development of p2pool (default: 0.5)',
642         type=float, action='store', default=0.5, dest='donation_percentage')
643     parser.add_argument('--iocp',
644         help='use Windows IOCP API in order to avoid errors due to large number of sockets being open',
645         action='store_true', default=False, dest='iocp')
646     parser.add_argument('--irc-announce',
647         help='announce any blocks found on irc://irc.freenode.net/#p2pool',
648         action='store_true', default=False, dest='irc_announce')
649     parser.add_argument('--no-bugreport',
650         help='disable submitting caught exceptions to the author',
651         action='store_true', default=False, dest='no_bugreport')
652     
653     p2pool_group = parser.add_argument_group('p2pool interface')
654     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
655         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
656         type=int, action='store', default=None, dest='p2pool_port')
657     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
658         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
659         type=str, action='append', default=[], dest='p2pool_nodes')
660     parser.add_argument('--disable-upnp',
661         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
662         action='store_false', default=True, dest='upnp')
663     p2pool_group.add_argument('--max-conns', metavar='CONNS',
664         help='maximum incoming connections (default: 40)',
665         type=int, action='store', default=40, dest='p2pool_conns')
666     
667     worker_group = parser.add_argument_group('worker interface')
668     worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
669         help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
670         type=str, action='store', default=None, dest='worker_endpoint')
671     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
672         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
673         type=float, action='store', default=0, dest='worker_fee')
674     
675     bitcoind_group = parser.add_argument_group('bitcoind interface')
676     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
677         help='connect to this address (default: 127.0.0.1)',
678         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
679     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
680         help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
681         type=int, action='store', default=None, dest='bitcoind_rpc_port')
682     bitcoind_group.add_argument('--bitcoind-rpc-ssl',
683         help='connect to JSON-RPC interface using SSL',
684         action='store_true', default=False, dest='bitcoind_rpc_ssl')
685     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
686         help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
687         type=int, action='store', default=None, dest='bitcoind_p2p_port')
688     
689     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
690         help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
691         type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
692     
693     args = parser.parse_args()
694     
695     if args.debug:
696         p2pool.DEBUG = True
697         defer.setDebugging(True)
698     
699     net_name = args.net_name + ('_testnet' if args.testnet else '')
700     net = networks.nets[net_name]
701     
702     datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
703     if not os.path.exists(datadir_path):
704         os.makedirs(datadir_path)
705     
706     if len(args.bitcoind_rpc_userpass) > 2:
707         parser.error('a maximum of two arguments are allowed')
708     args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
709     
710     if args.bitcoind_rpc_password is None:
711         conf_path = net.PARENT.CONF_FILE_FUNC()
712         if not os.path.exists(conf_path):
713             parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
714                 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
715                 '''\r\n'''
716                 '''server=1\r\n'''
717                 '''rpcpassword=%x\r\n'''
718                 '''\r\n'''
719                 '''Keep that password secret! After creating the file, restart Bitcoin.''' % (conf_path, random.randrange(2**128)))
720         conf = open(conf_path, 'rb').read()
721         contents = {}
722         for line in conf.splitlines(True):
723             if '#' in line:
724                 line = line[:line.index('#')]
725             if '=' not in line:
726                 continue
727             k, v = line.split('=', 1)
728             contents[k.strip()] = v.strip()
729         for conf_name, var_name, var_type in [
730             ('rpcuser', 'bitcoind_rpc_username', str),
731             ('rpcpassword', 'bitcoind_rpc_password', str),
732             ('rpcport', 'bitcoind_rpc_port', int),
733             ('port', 'bitcoind_p2p_port', int),
734         ]:
735             if getattr(args, var_name) is None and conf_name in contents:
736                 setattr(args, var_name, var_type(contents[conf_name]))
737         if args.bitcoind_rpc_password is None:
738             parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
739     
740     if args.bitcoind_rpc_username is None:
741         args.bitcoind_rpc_username = ''
742     
743     if args.bitcoind_rpc_port is None:
744         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
745     
746     if args.bitcoind_p2p_port is None:
747         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
748     
749     if args.p2pool_port is None:
750         args.p2pool_port = net.P2P_PORT
751     
752     if args.worker_endpoint is None:
753         worker_endpoint = '', net.WORKER_PORT
754     elif ':' not in args.worker_endpoint:
755         worker_endpoint = '', int(args.worker_endpoint)
756     else:
757         addr, port = args.worker_endpoint.rsplit(':', 1)
758         worker_endpoint = addr, int(port)
759     
760     if args.address is not None:
761         try:
762             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
763         except Exception, e:
764             parser.error('error parsing address: ' + repr(e))
765     else:
766         args.pubkey_hash = None
767     
768     def separate_url(url):
769         s = urlparse.urlsplit(url)
770         if '@' not in s.netloc:
771             parser.error('merged url netloc must contain an "@"')
772         userpass, new_netloc = s.netloc.rsplit('@', 1)
773         return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
774     merged_urls = map(separate_url, args.merged_urls)
775     
776     if args.logfile is None:
777         args.logfile = os.path.join(datadir_path, 'log')
778     
779     logfile = logging.LogFile(args.logfile)
780     pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
781     sys.stdout = logging.AbortPipe(pipe)
782     sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
783     if hasattr(signal, "SIGUSR1"):
784         def sigusr1(signum, frame):
785             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
786             logfile.reopen()
787             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
788         signal.signal(signal.SIGUSR1, sigusr1)
789     task.LoopingCall(logfile.reopen).start(5)
790     
791     class ErrorReporter(object):
792         def __init__(self):
793             self.last_sent = None
794         
795         def emit(self, eventDict):
796             if not eventDict["isError"]:
797                 return
798             
799             if self.last_sent is not None and time.time() < self.last_sent + 5:
800                 return
801             self.last_sent = time.time()
802             
803             if 'failure' in eventDict:
804                 text = ((eventDict.get('why') or 'Unhandled Error')
805                     + '\n' + eventDict['failure'].getTraceback())
806             else:
807                 text = " ".join([str(m) for m in eventDict["message"]]) + "\n"
808             
809             from twisted.web import client
810             client.getPage(
811                 url='http://u.forre.st/p2pool_error.cgi',
812                 method='POST',
813                 postdata=p2pool.__version__ + ' ' + net.NAME + '\n' + text,
814                 timeout=15,
815             ).addBoth(lambda x: None)
816     if not args.no_bugreport:
817         log.addObserver(ErrorReporter().emit)
818     
819     reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)
820     reactor.run()