add transactions from bitcoind over p2p connection to known_txs
[p2pool.git] / p2pool / main.py
1 from __future__ import division
2
3 import base64
4 import json
5 import os
6 import random
7 import sys
8 import time
9 import signal
10 import traceback
11 import urlparse
12
13 if '--iocp' in sys.argv:
14     from twisted.internet import iocpreactor
15     iocpreactor.install()
16 from twisted.internet import defer, reactor, protocol, task
17 from twisted.web import server
18 from twisted.python import log
19 from nattraverso import portmapper, ipdiscover
20
21 import bitcoin.p2p as bitcoin_p2p, bitcoin.data as bitcoin_data
22 from bitcoin import worker_interface, height_tracker
23 from util import fixargparse, jsonrpc, variable, deferral, math, logging
24 from . import p2p, networks, web, work
25 import p2pool, p2pool.data as p2pool_data
26
27 @deferral.retry('Error getting work from bitcoind:', 3)
28 @defer.inlineCallbacks
29 def getwork(bitcoind, use_getblocktemplate=False):
30     def go():
31         if use_getblocktemplate:
32             return bitcoind.rpc_getblocktemplate(dict(mode='template'))
33         else:
34             return bitcoind.rpc_getmemorypool()
35     try:
36         work = yield go()
37     except jsonrpc.Error_for_code(-32601): # Method not found
38         use_getblocktemplate = not use_getblocktemplate
39         try:
40             work = yield go()
41         except jsonrpc.Error_for_code(-32601): # Method not found
42             print >>sys.stderr, 'Error: Bitcoin version too old! Upgrade to v0.5 or newer!'
43             raise deferral.RetrySilentlyException()
44     packed_transactions = [(x['data'] if isinstance(x, dict) else x).decode('hex') for x in work['transactions']]
45     if 'height' not in work:
46         work['height'] = (yield bitcoind.rpc_getblock(work['previousblockhash']))['height'] + 1
47     elif p2pool.DEBUG:
48         assert work['height'] == (yield bitcoind.rpc_getblock(work['previousblockhash']))['height'] + 1
49     defer.returnValue(dict(
50         version=work['version'],
51         previous_block=int(work['previousblockhash'], 16),
52         transactions=map(bitcoin_data.tx_type.unpack, packed_transactions),
53         subsidy=work['coinbasevalue'],
54         time=work['time'] if 'time' in work else work['curtime'],
55         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
56         coinbaseflags=work['coinbaseflags'].decode('hex') if 'coinbaseflags' in work else ''.join(x.decode('hex') for x in work['coinbaseaux'].itervalues()) if 'coinbaseaux' in work else '',
57         height=work['height'],
58         last_update=time.time(),
59         use_getblocktemplate=use_getblocktemplate,
60     ))
61
62 @defer.inlineCallbacks
63 def main(args, net, datadir_path, merged_urls, worker_endpoint):
64     try:
65         print 'p2pool (version %s)' % (p2pool.__version__,)
66         print
67         
68         traffic_happened = variable.Event()
69         
70         @defer.inlineCallbacks
71         def connect_p2p():
72             # connect to bitcoind over bitcoin-p2p
73             print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
74             factory = bitcoin_p2p.ClientFactory(net.PARENT)
75             reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
76             yield factory.getProtocol() # waits until handshake is successful
77             print '    ...success!'
78             print
79             defer.returnValue(factory)
80         
81         if args.testnet: # establish p2p connection first if testnet so bitcoind can work without connections
82             factory = yield connect_p2p()
83         
84         # connect to bitcoind over JSON-RPC and do initial getmemorypool
85         url = '%s://%s:%i/' % ('https' if args.bitcoind_rpc_ssl else 'http', args.bitcoind_address, args.bitcoind_rpc_port)
86         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
87         bitcoind = jsonrpc.Proxy(url, dict(Authorization='Basic ' + base64.b64encode(args.bitcoind_rpc_username + ':' + args.bitcoind_rpc_password)), timeout=30)
88         @deferral.retry('Error while checking Bitcoin connection:', 1)
89         @defer.inlineCallbacks
90         def check():
91             if not (yield net.PARENT.RPC_CHECK(bitcoind)):
92                 print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
93                 raise deferral.RetrySilentlyException()
94             if not net.VERSION_CHECK((yield bitcoind.rpc_getinfo())['version']):
95                 print >>sys.stderr, '    Bitcoin version too old! Upgrade to 0.6.4 or newer!'
96                 raise deferral.RetrySilentlyException()
97         yield check()
98         temp_work = yield getwork(bitcoind)
99         
100         if not args.testnet:
101             factory = yield connect_p2p()
102         
103         block_height_var = variable.Variable(None)
104         @defer.inlineCallbacks
105         def poll_height():
106             block_height_var.set((yield deferral.retry('Error while calling getblockcount:')(bitcoind.rpc_getblockcount)()))
107         yield poll_height()
108         task.LoopingCall(poll_height).start(60*60)
109         
110         bitcoind_warning_var = variable.Variable(None)
111         @defer.inlineCallbacks
112         def poll_warnings():
113             errors = (yield deferral.retry('Error while calling getmininginfo:')(bitcoind.rpc_getmininginfo)())['errors']
114             bitcoind_warning_var.set(errors if errors != '' else None)
115         yield poll_warnings()
116         task.LoopingCall(poll_warnings).start(20*60)
117         
118         print '    ...success!'
119         print '    Current block hash: %x' % (temp_work['previous_block'],)
120         print '    Current block height: %i' % (block_height_var.value,)
121         print
122         
123         print 'Determining payout address...'
124         if args.pubkey_hash is None:
125             address_path = os.path.join(datadir_path, 'cached_payout_address')
126             
127             if os.path.exists(address_path):
128                 with open(address_path, 'rb') as f:
129                     address = f.read().strip('\r\n')
130                 print '    Loaded cached address: %s...' % (address,)
131             else:
132                 address = None
133             
134             if address is not None:
135                 res = yield deferral.retry('Error validating cached address:', 5)(lambda: bitcoind.rpc_validateaddress(address))()
136                 if not res['isvalid'] or not res['ismine']:
137                     print '    Cached address is either invalid or not controlled by local bitcoind!'
138                     address = None
139             
140             if address is None:
141                 print '    Getting payout address from bitcoind...'
142                 address = yield deferral.retry('Error getting payout address from bitcoind:', 5)(lambda: bitcoind.rpc_getaccountaddress('p2pool'))()
143             
144             with open(address_path, 'wb') as f:
145                 f.write(address)
146             
147             my_pubkey_hash = bitcoin_data.address_to_pubkey_hash(address, net.PARENT)
148         else:
149             my_pubkey_hash = args.pubkey_hash
150         print '    ...success! Payout address:', bitcoin_data.pubkey_hash_to_address(my_pubkey_hash, net.PARENT)
151         print
152         
153         my_share_hashes = set()
154         my_doa_share_hashes = set()
155         
156         tracker = p2pool_data.OkayTracker(net, my_share_hashes, my_doa_share_hashes)
157         shared_share_hashes = set()
158         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
159         known_verified = set()
160         print "Loading shares..."
161         for i, (mode, contents) in enumerate(ss.get_shares()):
162             if mode == 'share':
163                 if contents.hash in tracker.items:
164                     continue
165                 shared_share_hashes.add(contents.hash)
166                 contents.time_seen = 0
167                 tracker.add(contents)
168                 if len(tracker.items) % 1000 == 0 and tracker.items:
169                     print "    %i" % (len(tracker.items),)
170             elif mode == 'verified_hash':
171                 known_verified.add(contents)
172             else:
173                 raise AssertionError()
174         print "    ...inserting %i verified shares..." % (len(known_verified),)
175         for h in known_verified:
176             if h not in tracker.items:
177                 ss.forget_verified_share(h)
178                 continue
179             tracker.verified.add(tracker.items[h])
180         print "    ...done loading %i shares!" % (len(tracker.items),)
181         print
182         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
183         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
184         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
185         
186         print 'Initializing work...'
187         
188         
189         # BITCOIND WORK
190         
191         bitcoind_work = variable.Variable((yield getwork(bitcoind)))
192         @defer.inlineCallbacks
193         def work_poller():
194             while True:
195                 flag = factory.new_block.get_deferred()
196                 try:
197                     bitcoind_work.set((yield getwork(bitcoind, bitcoind_work.value['use_getblocktemplate'])))
198                 except:
199                     log.err()
200                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
201         work_poller()
202         
203         # PEER WORK
204         
205         best_block_header = variable.Variable(None)
206         def handle_header(new_header):
207             # check that header matches current target
208             if not (net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(new_header)) <= bitcoind_work.value['bits'].target):
209                 return
210             bitcoind_best_block = bitcoind_work.value['previous_block']
211             if (best_block_header.value is None
212                 or (
213                     new_header['previous_block'] == bitcoind_best_block and
214                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(best_block_header.value)) == bitcoind_best_block
215                 ) # new is child of current and previous is current
216                 or (
217                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(new_header)) == bitcoind_best_block and
218                     best_block_header.value['previous_block'] != bitcoind_best_block
219                 )): # new is current and previous is not a child of current
220                 best_block_header.set(new_header)
221         @defer.inlineCallbacks
222         def poll_header():
223             handle_header((yield factory.conn.value.get_block_header(bitcoind_work.value['previous_block'])))
224         bitcoind_work.changed.watch(lambda _: poll_header())
225         yield deferral.retry('Error while requesting best block header:')(poll_header)()
226         
227         # BEST SHARE
228         
229         known_txs_var = variable.Variable({}) # hash -> tx
230         mining_txs_var = variable.Variable({}) # hash -> tx
231         get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(bitcoind, factory, lambda: bitcoind_work.value['previous_block'], net)
232         
233         best_share_var = variable.Variable(None)
234         desired_var = variable.Variable(None)
235         def set_best_share():
236             best, desired = tracker.think(get_height_rel_highest, bitcoind_work.value['previous_block'], bitcoind_work.value['bits'], known_txs_var.value)
237             
238             best_share_var.set(best)
239             desired_var.set(desired)
240         bitcoind_work.changed.watch(lambda _: set_best_share())
241         set_best_share()
242         
243         print '    ...success!'
244         print
245         
246         # setup p2p logic and join p2pool network
247         
248         # update mining_txs according to getwork results
249         @bitcoind_work.changed.run_and_watch
250         def _(_=None):
251             new_mining_txs = {}
252             new_known_txs = dict(known_txs_var.value)
253             for tx in bitcoind_work.value['transactions']:
254                 tx_hash = bitcoin_data.hash256(bitcoin_data.tx_type.pack(tx))
255                 new_mining_txs[tx_hash] = tx
256                 new_known_txs[tx_hash] = tx
257             mining_txs_var.set(new_mining_txs)
258             known_txs_var.set(new_known_txs)
259         # add p2p transactions from bitcoind to known_txs
260         @factory.new_tx.watch
261         def _(tx):
262             new_known_txs = dict(known_txs_var.value)
263             new_known_txs[bitcoin_data.hash256(bitcoin_data.tx_type.pack(tx))] = tx
264             known_txs_var.set(new_known_txs)
265         # forward transactions seen to bitcoind
266         @known_txs_var.transitioned.watch
267         def _(before, after):
268             for tx_hash in set(after) - set(before):
269                 factory.conn.value.send_tx(tx=after[tx_hash])
270         
271         class Node(p2p.Node):
272             def handle_shares(self, shares, peer):
273                 if len(shares) > 5:
274                     print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
275                 
276                 new_count = 0
277                 for share in shares:
278                     if share.hash in tracker.items:
279                         #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
280                         continue
281                     
282                     new_count += 1
283                     
284                     #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
285                     
286                     tracker.add(share)
287                 
288                 if new_count:
289                     set_best_share()
290                 
291                 if len(shares) > 5:
292                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.items), 2*net.CHAIN_LENGTH)
293             
294             @defer.inlineCallbacks
295             def handle_share_hashes(self, hashes, peer):
296                 new_hashes = [x for x in hashes if x not in tracker.items]
297                 if not new_hashes:
298                     return
299                 try:
300                     shares = yield peer.get_shares(
301                         hashes=new_hashes,
302                         parents=0,
303                         stops=[],
304                     )
305                 except:
306                     log.err(None, 'in handle_share_hashes:')
307                 else:
308                     self.handle_shares(shares, peer)
309             
310             def handle_get_shares(self, hashes, parents, stops, peer):
311                 parents = min(parents, 1000//len(hashes))
312                 stops = set(stops)
313                 shares = []
314                 for share_hash in hashes:
315                     for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
316                         if share.hash in stops:
317                             break
318                         shares.append(share)
319                 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
320                 return shares
321             
322             def handle_bestblock(self, header, peer):
323                 if net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header)) > header['bits'].target:
324                     raise p2p.PeerMisbehavingError('received block header fails PoW test')
325                 handle_header(header)
326         
327         @deferral.retry('Error submitting primary block: (will retry)', 10, 10)
328         def submit_block_p2p(block):
329             if factory.conn.value is None:
330                 print >>sys.stderr, 'No bitcoind connection when block submittal attempted! %s%064x' % (net.PARENT.BLOCK_EXPLORER_URL_PREFIX, bitcoin_data.hash256(bitcoin_data.block_header_type.pack(block['header'])))
331                 raise deferral.RetrySilentlyException()
332             factory.conn.value.send_block(block=block)
333         
334         @deferral.retry('Error submitting block: (will retry)', 10, 10)
335         @defer.inlineCallbacks
336         def submit_block_rpc(block, ignore_failure):
337             if bitcoind_work.value['use_getblocktemplate']:
338                 result = yield bitcoind.rpc_submitblock(bitcoin_data.block_type.pack(block).encode('hex'))
339                 success = result is None
340             else:
341                 result = yield bitcoind.rpc_getmemorypool(bitcoin_data.block_type.pack(block).encode('hex'))
342                 success = result
343             success_expected = net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(block['header'])) <= block['header']['bits'].target
344             if (not success and success_expected and not ignore_failure) or (success and not success_expected):
345                 print >>sys.stderr, 'Block submittal result: %s (%r) Expected: %s' % (success, result, success_expected)
346         
347         def submit_block(block, ignore_failure):
348             submit_block_p2p(block)
349             submit_block_rpc(block, ignore_failure)
350         
351         @tracker.verified.added.watch
352         def _(share):
353             if share.pow_hash <= share.header['bits'].target:
354                 block = share.as_block(tracker, known_txs_var.value)
355                 if block is None:
356                     print >>sys.stderr, 'GOT INCOMPLETE BLOCK FROM PEER! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
357                     return
358                 submit_block(block, ignore_failure=True)
359                 print
360                 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
361                 print
362                 def spread():
363                     if (get_height_rel_highest(share.header['previous_block']) > -5 or
364                         bitcoind_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]):
365                         broadcast_share(share.hash)
366                 spread()
367                 reactor.callLater(5, spread) # so get_height_rel_highest can update
368         
369         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
370         
371         @defer.inlineCallbacks
372         def parse(x):
373             if ':' in x:
374                 ip, port = x.split(':')
375                 defer.returnValue(((yield reactor.resolve(ip)), int(port)))
376             else:
377                 defer.returnValue(((yield reactor.resolve(x)), net.P2P_PORT))
378         
379         addrs = {}
380         if os.path.exists(os.path.join(datadir_path, 'addrs')):
381             try:
382                 with open(os.path.join(datadir_path, 'addrs'), 'rb') as f:
383                     addrs.update(dict((tuple(k), v) for k, v in json.loads(f.read())))
384             except:
385                 print >>sys.stderr, 'error parsing addrs'
386         for addr_df in map(parse, net.BOOTSTRAP_ADDRS):
387             try:
388                 addr = yield addr_df
389                 if addr not in addrs:
390                     addrs[addr] = (0, time.time(), time.time())
391             except:
392                 log.err()
393         
394         connect_addrs = set()
395         for addr_df in map(parse, args.p2pool_nodes):
396             try:
397                 connect_addrs.add((yield addr_df))
398             except:
399                 log.err()
400         
401         p2p_node = Node(
402             best_share_hash_func=lambda: best_share_var.value,
403             port=args.p2pool_port,
404             net=net,
405             addr_store=addrs,
406             connect_addrs=connect_addrs,
407             max_incoming_conns=args.p2pool_conns,
408             traffic_happened=traffic_happened,
409             known_txs_var=known_txs_var,
410             mining_txs_var=mining_txs_var,
411         )
412         p2p_node.start()
413         
414         def save_addrs():
415             with open(os.path.join(datadir_path, 'addrs'), 'wb') as f:
416                 f.write(json.dumps(p2p_node.addr_store.items()))
417         task.LoopingCall(save_addrs).start(60)
418         
419         @best_block_header.changed.watch
420         def _(header):
421             for peer in p2p_node.peers.itervalues():
422                 peer.send_bestblock(header=header)
423         
424         @defer.inlineCallbacks
425         def broadcast_share(share_hash):
426             shares = []
427             for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
428                 if share.hash in shared_share_hashes:
429                     break
430                 shared_share_hashes.add(share.hash)
431                 shares.append(share)
432             
433             for peer in list(p2p_node.peers.itervalues()):
434                 yield peer.sendShares([share for share in shares if share.peer is not peer], tracker, known_txs_var.value, include_txs_with=[share_hash])
435         
436         # send share when the chain changes to their chain
437         best_share_var.changed.watch(broadcast_share)
438         
439         def save_shares():
440             for share in tracker.get_chain(best_share_var.value, min(tracker.get_height(best_share_var.value), 2*net.CHAIN_LENGTH)):
441                 ss.add_share(share)
442                 if share.hash in tracker.verified.items:
443                     ss.add_verified_hash(share.hash)
444         task.LoopingCall(save_shares).start(60)
445         
446         @apply
447         @defer.inlineCallbacks
448         def download_shares():
449             while True:
450                 desired = yield desired_var.get_when_satisfies(lambda val: len(val) != 0)
451                 peer2, share_hash = random.choice(desired)
452                 
453                 if len(p2p_node.peers) == 0:
454                     yield deferral.sleep(1)
455                     continue
456                 peer = random.choice(p2p_node.peers.values())
457                 
458                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
459                 try:
460                     shares = yield peer.get_shares(
461                         hashes=[share_hash],
462                         parents=500,
463                         stops=[],
464                     )
465                 except:
466                     log.err(None, 'in download_shares:')
467                     continue
468                 
469                 if not shares:
470                     yield deferral.sleep(1) # sleep so we don't keep rerequesting the same share nobody has
471                     continue
472                 p2p_node.handle_shares(shares, peer)
473         
474         print '    ...success!'
475         print
476         
477         if args.upnp:
478             @defer.inlineCallbacks
479             def upnp_thread():
480                 while True:
481                     try:
482                         is_lan, lan_ip = yield ipdiscover.get_local_ip()
483                         if is_lan:
484                             pm = yield portmapper.get_port_mapper()
485                             yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
486                     except defer.TimeoutError:
487                         pass
488                     except:
489                         if p2pool.DEBUG:
490                             log.err(None, 'UPnP error:')
491                     yield deferral.sleep(random.expovariate(1/120))
492             upnp_thread()
493         
494         # start listening for workers with a JSON-RPC server
495         
496         print 'Listening for workers on %r port %i...' % (worker_endpoint[0], worker_endpoint[1])
497         
498         get_current_txouts = lambda: p2pool_data.get_expected_payouts(tracker, best_share_var.value, bitcoind_work.value['bits'].target, bitcoind_work.value['subsidy'], net)
499         
500         wb = work.WorkerBridge(my_pubkey_hash, net, args.donation_percentage, bitcoind_work, best_block_header, merged_urls, best_share_var, tracker, my_share_hashes, my_doa_share_hashes, args.worker_fee, p2p_node, submit_block, set_best_share, broadcast_share, block_height_var)
501         web_root = web.get_web_root(tracker, bitcoind_work, get_current_txouts, datadir_path, net, wb.get_stale_counts, my_pubkey_hash, wb.local_rate_monitor, args.worker_fee, p2p_node, wb.my_share_hashes, wb.pseudoshare_received, wb.share_received, best_share_var, bitcoind_warning_var, traffic_happened)
502         worker_interface.WorkerInterface(wb).attach_to(web_root, get_handler=lambda request: request.redirect('/static/'))
503         
504         deferral.retry('Error binding to worker port:', traceback=False)(reactor.listenTCP)(worker_endpoint[1], server.Site(web_root), interface=worker_endpoint[0])
505         
506         with open(os.path.join(os.path.join(datadir_path, 'ready_flag')), 'wb') as f:
507             pass
508         
509         print '    ...success!'
510         print
511         
512         
513         # done!
514         print 'Started successfully!'
515         print 'Go to http://127.0.0.1:%i/ to view graphs and statistics!' % (worker_endpoint[1],)
516         if args.donation_percentage > 0.51:
517             print '''Donating %.1f%% of work towards P2Pool's development. Thanks for the tip!''' % (args.donation_percentage,)
518         elif args.donation_percentage < 0.49:
519             print '''Donating %.1f%% of work towards P2Pool's development. Please donate to encourage further development of P2Pool!''' % (args.donation_percentage,)
520         else:
521             print '''Donating %.1f%% of work towards P2Pool's development. Thank you!''' % (args.donation_percentage,)
522             print 'You can increase this amount with --give-author argument! (or decrease it, if you must)'
523         print
524         
525         
526         if hasattr(signal, 'SIGALRM'):
527             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
528                 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
529             ))
530             signal.siginterrupt(signal.SIGALRM, False)
531             task.LoopingCall(signal.alarm, 30).start(1)
532         
533         if args.irc_announce:
534             from twisted.words.protocols import irc
535             class IRCClient(irc.IRCClient):
536                 nickname = 'p2pool%02i' % (random.randrange(100),)
537                 channel = net.ANNOUNCE_CHANNEL
538                 def lineReceived(self, line):
539                     if p2pool.DEBUG:
540                         print repr(line)
541                     irc.IRCClient.lineReceived(self, line)
542                 def signedOn(self):
543                     irc.IRCClient.signedOn(self)
544                     self.factory.resetDelay()
545                     self.join(self.channel)
546                     @defer.inlineCallbacks
547                     def new_share(share):
548                         if share.pow_hash <= share.header['bits'].target and abs(share.timestamp - time.time()) < 10*60:
549                             yield deferral.sleep(random.expovariate(1/60))
550                             message = '\x02%s BLOCK FOUND by %s! %s%064x' % (net.NAME.upper(), bitcoin_data.script2_to_address(share.new_script, net.PARENT), net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
551                             if all('%x' % (share.header_hash,) not in old_message for old_message in self.recent_messages):
552                                 self.say(self.channel, message)
553                                 self._remember_message(message)
554                     self.watch_id = tracker.verified.added.watch(new_share)
555                     self.recent_messages = []
556                 def _remember_message(self, message):
557                     self.recent_messages.append(message)
558                     while len(self.recent_messages) > 100:
559                         self.recent_messages.pop(0)
560                 def privmsg(self, user, channel, message):
561                     if channel == self.channel:
562                         self._remember_message(message)
563                 def connectionLost(self, reason):
564                     tracker.verified.added.unwatch(self.watch_id)
565                     print 'IRC connection lost:', reason.getErrorMessage()
566             class IRCClientFactory(protocol.ReconnectingClientFactory):
567                 protocol = IRCClient
568             reactor.connectTCP("irc.freenode.net", 6667, IRCClientFactory())
569         
570         @defer.inlineCallbacks
571         def status_thread():
572             last_str = None
573             last_time = 0
574             while True:
575                 yield deferral.sleep(3)
576                 try:
577                     height = tracker.get_height(best_share_var.value)
578                     this_str = 'P2Pool: %i shares in chain (%i verified/%i total) Peers: %i (%i incoming)' % (
579                         height,
580                         len(tracker.verified.items),
581                         len(tracker.items),
582                         len(p2p_node.peers),
583                         sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
584                     ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
585                     
586                     datums, dt = wb.local_rate_monitor.get_datums_in_last()
587                     my_att_s = sum(datum['work']/dt for datum in datums)
588                     this_str += '\n Local: %sH/s in last %s Local dead on arrival: %s Expected time to share: %s' % (
589                         math.format(int(my_att_s)),
590                         math.format_dt(dt),
591                         math.format_binomial_conf(sum(1 for datum in datums if datum['dead']), len(datums), 0.95),
592                         math.format_dt(2**256 / tracker.items[best_share_var.value].max_target / my_att_s) if my_att_s and best_share_var.value else '???',
593                     )
594                     
595                     if height > 2:
596                         (stale_orphan_shares, stale_doa_shares), shares, _ = wb.get_stale_counts()
597                         stale_prop = p2pool_data.get_average_stale_prop(tracker, best_share_var.value, min(60*60//net.SHARE_PERIOD, height))
598                         real_att_s = p2pool_data.get_pool_attempts_per_second(tracker, best_share_var.value, min(height - 1, 60*60//net.SHARE_PERIOD)) / (1 - stale_prop)
599                         
600                         this_str += '\n Shares: %i (%i orphan, %i dead) Stale rate: %s Efficiency: %s Current payout: %.4f %s' % (
601                             shares, stale_orphan_shares, stale_doa_shares,
602                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95),
603                             math.format_binomial_conf(stale_orphan_shares + stale_doa_shares, shares, 0.95, lambda x: (1 - x)/(1 - stale_prop)),
604                             get_current_txouts().get(bitcoin_data.pubkey_hash_to_script2(my_pubkey_hash), 0)*1e-8, net.PARENT.SYMBOL,
605                         )
606                         this_str += '\n Pool: %sH/s Stale rate: %.1f%% Expected time to block: %s' % (
607                             math.format(int(real_att_s)),
608                             100*stale_prop,
609                             math.format_dt(2**256 / bitcoind_work.value['bits'].target / real_att_s),
610                         )
611                         
612                         for warning in p2pool_data.get_warnings(tracker, best_share_var.value, net, bitcoind_warning_var.value, bitcoind_work.value):
613                             print >>sys.stderr, '#'*40
614                             print >>sys.stderr, '>>> Warning: ' + warning
615                             print >>sys.stderr, '#'*40
616                     
617                     if this_str != last_str or time.time() > last_time + 15:
618                         print this_str
619                         last_str = this_str
620                         last_time = time.time()
621                 except:
622                     log.err()
623         status_thread()
624     except:
625         reactor.stop()
626         log.err(None, 'Fatal error:')
627
628 def run():
629     realnets = dict((name, net) for name, net in networks.nets.iteritems() if '_testnet' not in name)
630     
631     parser = fixargparse.FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
632     parser.add_argument('--version', action='version', version=p2pool.__version__)
633     parser.add_argument('--net',
634         help='use specified network (default: bitcoin)',
635         action='store', choices=sorted(realnets), default='bitcoin', dest='net_name')
636     parser.add_argument('--testnet',
637         help='''use the network's testnet''',
638         action='store_const', const=True, default=False, dest='testnet')
639     parser.add_argument('--debug',
640         help='enable debugging mode',
641         action='store_const', const=True, default=False, dest='debug')
642     parser.add_argument('-a', '--address',
643         help='generate payouts to this address (default: <address requested from bitcoind>)',
644         type=str, action='store', default=None, dest='address')
645     parser.add_argument('--datadir',
646         help='store data in this directory (default: <directory run_p2pool.py is in>/data)',
647         type=str, action='store', default=None, dest='datadir')
648     parser.add_argument('--logfile',
649         help='''log to this file (default: data/<NET>/log)''',
650         type=str, action='store', default=None, dest='logfile')
651     parser.add_argument('--merged',
652         help='call getauxblock on this url to get work for merged mining (example: http://ncuser:ncpass@127.0.0.1:10332/)',
653         type=str, action='append', default=[], dest='merged_urls')
654     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
655         help='donate this percentage of work towards the development of p2pool (default: 0.5)',
656         type=float, action='store', default=0.5, dest='donation_percentage')
657     parser.add_argument('--iocp',
658         help='use Windows IOCP API in order to avoid errors due to large number of sockets being open',
659         action='store_true', default=False, dest='iocp')
660     parser.add_argument('--irc-announce',
661         help='announce any blocks found on irc://irc.freenode.net/#p2pool',
662         action='store_true', default=False, dest='irc_announce')
663     parser.add_argument('--no-bugreport',
664         help='disable submitting caught exceptions to the author',
665         action='store_true', default=False, dest='no_bugreport')
666     
667     p2pool_group = parser.add_argument_group('p2pool interface')
668     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
669         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (name, net.P2P_PORT) for name, net in sorted(realnets.items())),
670         type=int, action='store', default=None, dest='p2pool_port')
671     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
672         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
673         type=str, action='append', default=[], dest='p2pool_nodes')
674     parser.add_argument('--disable-upnp',
675         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
676         action='store_false', default=True, dest='upnp')
677     p2pool_group.add_argument('--max-conns', metavar='CONNS',
678         help='maximum incoming connections (default: 40)',
679         type=int, action='store', default=40, dest='p2pool_conns')
680     
681     worker_group = parser.add_argument_group('worker interface')
682     worker_group.add_argument('-w', '--worker-port', metavar='PORT or ADDR:PORT',
683         help='listen on PORT on interface with ADDR for RPC connections from miners (default: all interfaces, %s)' % ', '.join('%s:%i' % (name, net.WORKER_PORT) for name, net in sorted(realnets.items())),
684         type=str, action='store', default=None, dest='worker_endpoint')
685     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
686         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
687         type=float, action='store', default=0, dest='worker_fee')
688     
689     bitcoind_group = parser.add_argument_group('bitcoind interface')
690     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
691         help='connect to this address (default: 127.0.0.1)',
692         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
693     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
694         help='''connect to JSON-RPC interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.RPC_PORT) for name, net in sorted(realnets.items())),
695         type=int, action='store', default=None, dest='bitcoind_rpc_port')
696     bitcoind_group.add_argument('--bitcoind-rpc-ssl',
697         help='connect to JSON-RPC interface using SSL',
698         action='store_true', default=False, dest='bitcoind_rpc_ssl')
699     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
700         help='''connect to P2P interface at this port (default: %s <read from bitcoin.conf if password not provided>)''' % ', '.join('%s:%i' % (name, net.PARENT.P2P_PORT) for name, net in sorted(realnets.items())),
701         type=int, action='store', default=None, dest='bitcoind_p2p_port')
702     
703     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSERPASS',
704         help='bitcoind RPC interface username, then password, space-separated (only one being provided will cause the username to default to being empty, and none will cause P2Pool to read them from bitcoin.conf)',
705         type=str, action='store', default=[], nargs='*', dest='bitcoind_rpc_userpass')
706     
707     args = parser.parse_args()
708     
709     if args.debug:
710         p2pool.DEBUG = True
711         defer.setDebugging(True)
712     
713     net_name = args.net_name + ('_testnet' if args.testnet else '')
714     net = networks.nets[net_name]
715     
716     datadir_path = os.path.join((os.path.join(os.path.dirname(sys.argv[0]), 'data') if args.datadir is None else args.datadir), net_name)
717     if not os.path.exists(datadir_path):
718         os.makedirs(datadir_path)
719     
720     if len(args.bitcoind_rpc_userpass) > 2:
721         parser.error('a maximum of two arguments are allowed')
722     args.bitcoind_rpc_username, args.bitcoind_rpc_password = ([None, None] + args.bitcoind_rpc_userpass)[-2:]
723     
724     if args.bitcoind_rpc_password is None:
725         conf_path = net.PARENT.CONF_FILE_FUNC()
726         if not os.path.exists(conf_path):
727             parser.error('''Bitcoin configuration file not found. Manually enter your RPC password.\r\n'''
728                 '''If you actually haven't created a configuration file, you should create one at %s with the text:\r\n'''
729                 '''\r\n'''
730                 '''server=1\r\n'''
731                 '''rpcpassword=%x\r\n'''
732                 '''\r\n'''
733                 '''Keep that password secret! After creating the file, restart Bitcoin.''' % (conf_path, random.randrange(2**128)))
734         conf = open(conf_path, 'rb').read()
735         contents = {}
736         for line in conf.splitlines(True):
737             if '#' in line:
738                 line = line[:line.index('#')]
739             if '=' not in line:
740                 continue
741             k, v = line.split('=', 1)
742             contents[k.strip()] = v.strip()
743         for conf_name, var_name, var_type in [
744             ('rpcuser', 'bitcoind_rpc_username', str),
745             ('rpcpassword', 'bitcoind_rpc_password', str),
746             ('rpcport', 'bitcoind_rpc_port', int),
747             ('port', 'bitcoind_p2p_port', int),
748         ]:
749             if getattr(args, var_name) is None and conf_name in contents:
750                 setattr(args, var_name, var_type(contents[conf_name]))
751         if args.bitcoind_rpc_password is None:
752             parser.error('''Bitcoin configuration file didn't contain an rpcpassword= line! Add one!''')
753     
754     if args.bitcoind_rpc_username is None:
755         args.bitcoind_rpc_username = ''
756     
757     if args.bitcoind_rpc_port is None:
758         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
759     
760     if args.bitcoind_p2p_port is None:
761         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
762     
763     if args.p2pool_port is None:
764         args.p2pool_port = net.P2P_PORT
765     
766     if args.worker_endpoint is None:
767         worker_endpoint = '', net.WORKER_PORT
768     elif ':' not in args.worker_endpoint:
769         worker_endpoint = '', int(args.worker_endpoint)
770     else:
771         addr, port = args.worker_endpoint.rsplit(':', 1)
772         worker_endpoint = addr, int(port)
773     
774     if args.address is not None:
775         try:
776             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
777         except Exception, e:
778             parser.error('error parsing address: ' + repr(e))
779     else:
780         args.pubkey_hash = None
781     
782     def separate_url(url):
783         s = urlparse.urlsplit(url)
784         if '@' not in s.netloc:
785             parser.error('merged url netloc must contain an "@"')
786         userpass, new_netloc = s.netloc.rsplit('@', 1)
787         return urlparse.urlunsplit(s._replace(netloc=new_netloc)), userpass
788     merged_urls = map(separate_url, args.merged_urls)
789     
790     if args.logfile is None:
791         args.logfile = os.path.join(datadir_path, 'log')
792     
793     logfile = logging.LogFile(args.logfile)
794     pipe = logging.TimestampingPipe(logging.TeePipe([logging.EncodeReplacerPipe(sys.stderr), logfile]))
795     sys.stdout = logging.AbortPipe(pipe)
796     sys.stderr = log.DefaultObserver.stderr = logging.AbortPipe(logging.PrefixPipe(pipe, '> '))
797     if hasattr(signal, "SIGUSR1"):
798         def sigusr1(signum, frame):
799             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
800             logfile.reopen()
801             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
802         signal.signal(signal.SIGUSR1, sigusr1)
803     task.LoopingCall(logfile.reopen).start(5)
804     
805     class ErrorReporter(object):
806         def __init__(self):
807             self.last_sent = None
808         
809         def emit(self, eventDict):
810             if not eventDict["isError"]:
811                 return
812             
813             if self.last_sent is not None and time.time() < self.last_sent + 5:
814                 return
815             self.last_sent = time.time()
816             
817             if 'failure' in eventDict:
818                 text = ((eventDict.get('why') or 'Unhandled Error')
819                     + '\n' + eventDict['failure'].getTraceback())
820             else:
821                 text = " ".join([str(m) for m in eventDict["message"]]) + "\n"
822             
823             from twisted.web import client
824             client.getPage(
825                 url='http://u.forre.st/p2pool_error.cgi',
826                 method='POST',
827                 postdata=p2pool.__version__ + ' ' + net.NAME + '\n' + text,
828                 timeout=15,
829             ).addBoth(lambda x: None)
830     if not args.no_bugreport:
831         log.addObserver(ErrorReporter().emit)
832     
833     reactor.callWhenRunning(main, args, net, datadir_path, merged_urls, worker_endpoint)
834     reactor.run()