default to ixcoin's changed RPC port when using it
[p2pool.git] / p2pool / main.py
1 #!/usr/bin/python
2
3 from __future__ import division
4
5 import argparse
6 import datetime
7 import itertools
8 import os
9 import random
10 import sqlite3
11 import struct
12 import sys
13 import time
14 import json
15 import signal
16 import traceback
17
18 from twisted.internet import defer, reactor, task
19 from twisted.web import server, resource
20 from twisted.python import log
21 from nattraverso import portmapper, ipdiscover
22
23 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
24 from util import db, expiring_dict, jsonrpc, variable, deferral, math
25 from . import p2p, worker_interface, skiplists
26 import p2pool.data as p2pool
27 import p2pool as p2pool_init
28
29 @deferral.retry('Error getting work from bitcoind:', 3)
30 @defer.inlineCallbacks
31 def getwork(bitcoind, ht):
32     # a block could arrive in between these two queries
33     work = bitcoin.getwork.BlockAttempt.from_getwork((yield bitcoind.rpc_getwork()))
34     try:
35         height = ht.getHeight(work.previous_block)
36     except ValueError:
37         height = 1000 # XXX
38     defer.returnValue((work, height))
39
40 @deferral.retry('Error getting payout script from bitcoind:', 1)
41 @defer.inlineCallbacks
42 def get_payout_script(factory):
43     res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
44     if res['reply'] == 'success':
45         defer.returnValue(res['script'])
46     elif res['reply'] == 'denied':
47         defer.returnValue(None)
48     else:
49         raise ValueError('Unexpected reply: %r' % (res,))
50
51 @deferral.retry('Error creating payout script:', 10)
52 @defer.inlineCallbacks
53 def get_payout_script2(bitcoind, net):
54     defer.returnValue(bitcoin.data.pubkey_hash_to_script2(bitcoin.data.address_to_pubkey_hash((yield bitcoind.rpc_getaccountaddress('p2pool')), net)))
55
56 @defer.inlineCallbacks
57 def main(args):
58     try:
59         if args.charts:
60             from . import draw
61         
62         print 'p2pool (version %s)' % (p2pool_init.__version__,)
63         print
64         
65         # connect to bitcoind over JSON-RPC and do initial getwork
66         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
67         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
68         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
69         good = yield args.net.BITCOIN_RPC_CHECK(bitcoind)
70         if not good:
71             print "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
72             return
73         temp_work = bitcoin.getwork.BlockAttempt.from_getwork((yield bitcoind.rpc_getwork()))
74         print '    ...success!'
75         print '    Current block hash: %x' % (temp_work.previous_block,)
76         print
77         
78         # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
79         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
80         factory = bitcoin.p2p.ClientFactory(args.net)
81         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
82         my_script = yield get_payout_script(factory)
83         if args.pubkey_hash is None:
84             if my_script is None:
85                 print '    IP transaction denied ... falling back to sending to address.'
86                 my_script = yield get_payout_script2(bitcoind, args.net)
87         else:
88             my_script = bitcoin.data.pubkey_hash_to_script2(args.pubkey_hash)
89         print '    ...success!'
90         print '    Payout script:', my_script.encode('hex')
91         print
92         
93         print 'Loading cached block headers...'
94         ht = bitcoin.p2p.HeightTracker(factory, args.net.HEADERSTORE_FILENAME)
95         print '   ...done loading %i cached block headers.' % (len(ht.tracker.shares),)
96         print
97         
98         tracker = p2pool.OkayTracker(args.net)
99         ss = p2pool.ShareStore(os.path.join(os.path.dirname(sys.argv[0]), args.net.SHARESTORE_FILENAME), args.net)
100         known_verified = set()
101         print "Loading shares..."
102         for i, (mode, contents) in enumerate(ss.get_shares()):
103             if mode == 'share':
104                 if contents.hash in tracker.shares:
105                     continue
106                 contents.shared = True
107                 contents.stored = True
108                 tracker.add(contents)
109                 if len(tracker.shares) % 1000 == 0 and tracker.shares:
110                     print "    %i" % (len(tracker.shares),)
111             elif mode == 'verified_hash':
112                 known_verified.add(contents)
113             else:
114                 raise AssertionError()
115         print "    ...inserting %i verified shares..." % (len(known_verified),)
116         for h in known_verified:
117             if h not in tracker.shares:
118                 continue
119             tracker.verified.add(tracker.shares[h])
120         print "    ...done loading %i shares!" % (len(tracker.shares),)
121         print
122         tracker.added.watch(ss.add_share)
123         tracker.verified.added.watch(lambda share: ss.add_verified_hash(share.hash))
124         
125         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
126         
127         # information affecting work that should trigger a long-polling update
128         current_work = variable.Variable(None)
129         # information affecting work that should not trigger a long-polling update
130         current_work2 = variable.Variable(None)
131         
132         work_updated = variable.Event()
133         
134         requested = expiring_dict.ExpiringDict(300)
135         
136         @defer.inlineCallbacks
137         def set_real_work1():
138             work, height = yield getwork(bitcoind, ht)
139             changed = work.previous_block != current_work.value['previous_block'] if current_work.value is not None else True
140             current_work.set(dict(
141                 version=work.version,
142                 previous_block=work.previous_block,
143                 target=work.target,
144                 height=height,
145                 best_share_hash=current_work.value['best_share_hash'] if current_work.value is not None else None,
146             ))
147             current_work2.set(dict(
148                 clock_offset=time.time() - work.timestamp,
149                 last_update=time.time(),
150             ))
151             if changed:
152                 set_real_work2()
153         
154         def set_real_work2():
155             best, desired = tracker.think(ht, current_work.value['previous_block'], time.time() - current_work2.value['clock_offset'])
156             
157             t = dict(current_work.value)
158             t['best_share_hash'] = best
159             current_work.set(t)
160             
161             t = time.time()
162             for peer2, share_hash in desired:
163                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
164                     continue
165                 last_request_time, count = requested.get(share_hash, (None, 0))
166                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
167                     continue
168                 potential_peers = set()
169                 for head in tracker.tails[share_hash]:
170                     potential_peers.update(peer_heads.get(head, set()))
171                 potential_peers = [peer for peer in potential_peers if peer.connected2]
172                 if count == 0 and peer2 is not None and peer2.connected2:
173                     peer = peer2
174                 else:
175                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
176                     if peer is None:
177                         continue
178                 
179                 print 'Requesting parent share %s from %s' % (p2pool.format_hash(share_hash), '%s:%i' % peer.addr)
180                 peer.send_getshares(
181                     hashes=[share_hash],
182                     parents=2000,
183                     stops=list(set(tracker.heads) | set(
184                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
185                     ))[:100],
186                 )
187                 requested[share_hash] = t, count + 1
188         
189         print 'Initializing work...'
190         yield set_real_work1()
191         set_real_work2()
192         print '    ...success!'
193         print
194         
195         start_time = time.time() - current_work2.value['clock_offset']
196         
197         # setup p2p logic and join p2pool network
198         
199         def share_share(share, ignore_peer=None):
200             for peer in p2p_node.peers.itervalues():
201                 if peer is ignore_peer:
202                     continue
203                 #if p2pool_init.DEBUG:
204                 #    print "Sending share %s to %r" % (p2pool.format_hash(share.hash), peer.addr)
205                 peer.send_shares([share])
206             share.flag_shared()
207         
208         def p2p_shares(shares, peer=None):
209             if len(shares) > 5:
210                 print 'Processing %i shares...' % (len(shares),)
211             
212             some_new = False
213             for share in shares:
214                 if share.hash in tracker.shares:
215                     #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool.format_hash(share.hash),)
216                     continue
217                 some_new = True
218                 
219                 #print 'Received share %s from %r' % (p2pool.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
220                 
221                 tracker.add(share)
222                 #for peer2, share_hash in desired:
223                 #    print 'Requesting parent share %x' % (share_hash,)
224                 #    peer2.send_getshares(hashes=[share_hash], parents=2000)
225                 
226                 if share.bitcoin_hash <= share.header['target']:
227                     print
228                     print 'GOT BLOCK! Passing to bitcoind! %s bitcoin: %x' % (p2pool.format_hash(share.hash), share.bitcoin_hash,)
229                     print
230                     if factory.conn.value is not None:
231                         factory.conn.value.send_block(block=share.as_block(tracker, args.net))
232                     else:
233                         print 'No bitcoind connection! Erp!'
234             
235             if shares and peer is not None:
236                 peer_heads.setdefault(shares[0].hash, set()).add(peer)
237             
238             if some_new:
239                 set_real_work2()
240             
241             if len(shares) > 5:
242                 print '... done processing %i shares. Have: %i/~%i' % (len(shares), len(tracker.shares), 2*args.net.CHAIN_LENGTH)
243         
244         def p2p_share_hashes(share_hashes, peer):
245             t = time.time()
246             get_hashes = []
247             for share_hash in share_hashes:
248                 if share_hash in tracker.shares:
249                     continue
250                 last_request_time, count = requested.get(share_hash, (None, 0))
251                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
252                     continue
253                 print 'Got share hash, requesting! Hash: %s' % (p2pool.format_hash(share_hash),)
254                 get_hashes.append(share_hash)
255                 requested[share_hash] = t, count + 1
256             
257             if share_hashes and peer is not None:
258                 peer_heads.setdefault(share_hashes[0], set()).add(peer)
259             if get_hashes:
260                 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
261         
262         def p2p_get_shares(share_hashes, parents, stops, peer):
263             parents = min(parents, 1000//len(share_hashes))
264             stops = set(stops)
265             shares = []
266             for share_hash in share_hashes:
267                 for share in itertools.islice(tracker.get_chain_known(share_hash), parents + 1):
268                     if share.hash in stops:
269                         break
270                     shares.append(share)
271             print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
272             peer.send_shares(shares, full=True)
273         
274         print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
275         
276         def parse(x):
277             if ':' in x:
278                 ip, port = x.split(':')
279                 return ip, int(port)
280             else:
281                 return x, args.net.P2P_PORT
282         
283         nodes = set([
284             ('72.14.191.28', args.net.P2P_PORT),
285             ('62.204.197.159', args.net.P2P_PORT),
286             ('142.58.248.28', args.net.P2P_PORT),
287             ('94.23.34.145', args.net.P2P_PORT),
288         ])
289         for host in [
290             'p2pool.forre.st',
291             'dabuttonfactory.com',
292         ]:
293             try:
294                 nodes.add(((yield reactor.resolve(host)), args.net.P2P_PORT))
295             except:
296                 log.err(None, 'Error resolving bootstrap node IP:')
297         
298         p2p_node = p2p.Node(
299             current_work=current_work,
300             port=args.p2pool_port,
301             net=args.net,
302             addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.ADDRS_TABLE),
303             mode=0 if args.low_bandwidth else 1,
304             preferred_addrs=set(map(parse, args.p2pool_nodes)) | nodes,
305         )
306         p2p_node.handle_shares = p2p_shares
307         p2p_node.handle_share_hashes = p2p_share_hashes
308         p2p_node.handle_get_shares = p2p_get_shares
309         
310         p2p_node.start()
311         
312         # send share when the chain changes to their chain
313         def work_changed(new_work):
314             #print 'Work changed:', new_work
315             for share in tracker.get_chain_known(new_work['best_share_hash']):
316                 if share.shared:
317                     break
318                 share_share(share, share.peer)
319         current_work.changed.watch(work_changed)
320         
321         print '    ...success!'
322         print
323         
324         @defer.inlineCallbacks
325         def upnp_thread():
326             while True:
327                 try:
328                     is_lan, lan_ip = yield ipdiscover.get_local_ip()
329                     if is_lan:
330                         pm = yield portmapper.get_port_mapper()
331                         yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP') # XXX try to forward external correct port?
332                 except:
333                     if p2pool_init.DEBUG:
334                         log.err(None, "UPnP error:")
335                 yield deferral.sleep(random.expovariate(1/120))
336         
337         if args.upnp:
338             upnp_thread()
339         
340         # start listening for workers with a JSON-RPC server
341         
342         print 'Listening for workers on port %i...' % (args.worker_port,)
343         
344         # setup worker logic
345         
346         merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
347         run_identifier = struct.pack('<Q', random.randrange(2**64))
348         
349         share_counter = skiplists.CountsSkipList(tracker, run_identifier)
350         removed_unstales = set()
351         def get_share_counts():
352             height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
353             matching_in_chain = share_counter(current_work.value['best_share_hash'], height) | removed_unstales
354             shares_in_chain = my_shares & matching_in_chain
355             stale_shares = my_shares - matching_in_chain
356             return len(shares_in_chain) + len(stale_shares), len(stale_shares)
357         @tracker.verified.removed.watch
358         def _(share):
359             if share.hash in my_shares and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
360                 removed_unstales.add(share.hash)
361         
362         def compute(state, payout_script):
363             if payout_script is None:
364                 payout_script = my_script
365             if state['best_share_hash'] is None and args.net.PERSIST:
366                 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
367             if len(p2p_node.peers) == 0 and args.net.PERSIST:
368                 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
369             if time.time() > current_work2.value['last_update'] + 60:
370                 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
371             pre_extra_txs = [tx for tx in tx_pool.itervalues() if tx.is_good()]
372             pre_extra_txs = pre_extra_txs[:2**16 - 1] # merkle_branch limit
373             extra_txs = []
374             size = 0
375             for tx in pre_extra_txs:
376                 this_size = len(bitcoin.data.tx_type.pack(tx.tx))
377                 if size + this_size > 500000:
378                     break
379                 extra_txs.append(tx)
380                 size += this_size
381             # XXX check sigops!
382             # XXX assuming generate_tx is smallish here..
383             def get_stale_frac():
384                 shares, stale_shares = get_share_counts()
385                 if shares == 0:
386                     return ""
387                 frac = stale_shares/shares
388                 return 2*struct.pack('<H', int(65535*frac + .5))
389             generate_tx = p2pool.generate_transaction(
390                 tracker=tracker,
391                 previous_share_hash=state['best_share_hash'],
392                 new_script=payout_script,
393                 subsidy=args.net.BITCOIN_SUBSIDY_FUNC(state['height']) + sum(tx.value_in - tx.value_out for tx in extra_txs),
394                 nonce=run_identifier + struct.pack('<Q', random.randrange(2**64)) + get_stale_frac(),
395                 block_target=state['target'],
396                 net=args.net,
397             )
398             print 'Generating! Difficulty: %.06f Payout if block: %.6f %s' % (0xffff*2**208/p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'], generate_tx['tx_outs'][-1]['value']*1e-8, args.net.BITCOIN_SYMBOL)
399             #print 'Target: %x' % (p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'],)
400             #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
401             transactions = [generate_tx] + [tx.tx for tx in extra_txs]
402             merkle_root = bitcoin.data.merkle_hash(transactions)
403             merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
404             
405             timestamp = int(time.time() - current_work2.value['clock_offset'])
406             if state['best_share_hash'] is not None:
407                 timestamp2 = math.median((s.timestamp for s in itertools.islice(tracker.get_chain_to_root(state['best_share_hash']), 11)), use_float=False) + 1
408                 if timestamp2 > timestamp:
409                     print 'Toff', timestamp2 - timestamp
410                     timestamp = timestamp2
411             target2 = p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
412             times[p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['nonce']] = time.time()
413             #print 'SENT', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
414             return bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'], target2)
415         
416         my_shares = set()
417         times = {}
418         
419         def got_response(data):
420             try:
421                 # match up with transactions
422                 header = bitcoin.getwork.decode_data(data)
423                 transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
424                 if transactions is None:
425                     print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
426                     return False
427                 block = dict(header=header, txs=transactions)
428                 hash_ = bitcoin.data.block_header_type.hash256(block['header'])
429                 if hash_ <= block['header']['target'] or p2pool_init.DEBUG:
430                     if factory.conn.value is not None:
431                         factory.conn.value.send_block(block=block)
432                     else:
433                         print 'No bitcoind connection! Erp!'
434                     if hash_ <= block['header']['target']:
435                         print
436                         print 'GOT BLOCK! Passing to bitcoind! bitcoin: %x' % (hash_,)
437                         print
438                 target = p2pool.coinbase_type.unpack(transactions[0]['tx_ins'][0]['script'])['share_data']['target']
439                 if hash_ > target:
440                     print 'Received invalid share from worker - %x/%x' % (hash_, target)
441                     return False
442                 share = p2pool.Share.from_block(block)
443                 my_shares.add(share.hash)
444                 print 'GOT SHARE! %s prev %s age %.2fs' % (p2pool.format_hash(share.hash), p2pool.format_hash(share.previous_hash), time.time() - times[share.nonce]) + (' DEAD ON ARRIVAL' if share.previous_hash != current_work.value['best_share_hash'] else '')
445                 good = share.previous_hash == current_work.value['best_share_hash']
446                 # maybe revert back to tracker being non-blocking so 'good' can be more accurate?
447                 p2p_shares([share])
448                 # eg. good = share.hash == current_work.value['best_share_hash'] here
449                 return good
450             except:
451                 log.err(None, 'Error processing data received from worker:')
452                 return False
453         
454         web_root = worker_interface.WorkerInterface(current_work, compute, got_response, args.net)
455         
456         def get_rate():
457             if current_work.value['best_share_hash'] is not None:
458                 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
459                 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net, min(height, 720))
460                 return json.dumps(att_s)
461             return json.dumps(None)
462         
463         def get_users():
464             height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
465             weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 2**256)
466             res = {}
467             for script in sorted(weights, key=lambda s: weights[s]):
468                 res[bitcoin.data.script2_to_human(script, args.net)] = weights[script]/total_weight
469             return json.dumps(res)
470         
471         class WebInterface(resource.Resource):
472             def __init__(self, func, mime_type):
473                 self.func, self.mime_type = func, mime_type
474             
475             def render_GET(self, request):
476                 request.setHeader('Content-Type', self.mime_type)
477                 return self.func()
478         
479         web_root.putChild('rate', WebInterface(get_rate, 'application/json'))
480         web_root.putChild('users', WebInterface(get_users, 'application/json'))
481         if args.charts:
482             web_root.putChild('chain_img', WebInterface(lambda: draw.get(tracker, current_work.value['best_share_hash']), 'image/png'))
483         
484         reactor.listenTCP(args.worker_port, server.Site(web_root))
485         
486         print '    ...success!'
487         print
488         
489         # done!
490         
491         tx_pool = expiring_dict.ExpiringDict(600, get_touches=False) # hash -> tx
492         get_raw_transaction = deferral.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
493         
494         class Tx(object):
495             def __init__(self, tx, seen_at_block):
496                 self.hash = bitcoin.data.tx_type.hash256(tx)
497                 self.tx = tx
498                 self.seen_at_block = seen_at_block
499                 self.mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
500                 #print
501                 #print '%x %r' % (seen_at_block, tx)
502                 #for mention in self.mentions:
503                 #    print '%x' % mention
504                 #print
505                 self.parents_all_in_blocks = False
506                 self.value_in = 0
507                 #print self.tx
508                 self.value_out = sum(txout['value'] for txout in self.tx['tx_outs'])
509                 self._find_parents_in_blocks()
510             
511             @defer.inlineCallbacks
512             def _find_parents_in_blocks(self):
513                 for tx_in in self.tx['tx_ins']:
514                     try:
515                         raw_transaction = yield get_raw_transaction(tx_in['previous_output']['hash'])
516                     except Exception:
517                         return
518                     self.value_in += raw_transaction['tx']['txouts'][tx_in['previous_output']['index']]['value']
519                     #print raw_transaction
520                     if not raw_transaction['parent_blocks']:
521                         return
522                 self.parents_all_in_blocks = True
523             
524             def is_good(self):
525                 if not self.parents_all_in_blocks:
526                     return False
527                 x = self.is_good2()
528                 #print 'is_good:', x
529                 return x
530         
531         @defer.inlineCallbacks
532         def new_tx(tx_hash):
533             try:
534                 assert isinstance(tx_hash, (int, long))
535                 #print 'REQUESTING', tx_hash
536                 tx = yield (yield factory.getProtocol()).get_tx(tx_hash)
537                 #print 'GOT', tx
538                 tx_pool[bitcoin.data.tx_type.hash256(tx)] = Tx(tx, current_work.value['previous_block'])
539             except:
540                 log.err(None, 'Error handling tx:')
541         # disable for now, for testing impact on stales
542         #factory.new_tx.watch(new_tx)
543         
544         def new_block(block_hash):
545             work_updated.happened()
546         factory.new_block.watch(new_block)
547         
548         print 'Started successfully!'
549         print
550         
551         ht.updated.watch(set_real_work2)
552         
553         @defer.inlineCallbacks
554         def work1_thread():
555             while True:
556                 flag = work_updated.get_deferred()
557                 try:
558                     yield set_real_work1()
559                 except:
560                     log.err()
561                 yield defer.DeferredList([flag, deferral.sleep(random.uniform(1, 10))], fireOnOneCallback=True)
562         
563         @defer.inlineCallbacks
564         def work2_thread():
565             while True:
566                 try:
567                     set_real_work2()
568                 except:
569                     log.err()
570                 yield deferral.sleep(random.expovariate(1/20))
571         
572         work1_thread()
573         work2_thread()
574         
575         
576         def watchdog_handler(signum, frame):
577             print "Watchdog timer went off at:"
578             traceback.print_exc()
579         
580         signal.signal(signal.SIGALRM, watchdog_handler)
581         task.LoopingCall(signal.alarm, 30).start(1)
582         
583         
584         def read_stale_frac(share):
585             if len(share.nonce) != 20:
586                 return None
587             a, b = struct.unpack("<HH", share.nonce[-4:])
588             if a != b:
589                 return None
590             return a/65535
591         
592         while True:
593             yield deferral.sleep(3)
594             try:
595                 if current_work.value['best_share_hash'] is not None:
596                     height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
597                     if height > 2:
598                         att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net, min(height - 1, 120))
599                         weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 120), 2**100)
600                         shares, stale_shares = get_share_counts()
601                         print 'Pool: %sH/s in %i shares (%i/%i verified) Recent: %.02f%% >%sH/s Shares: %i (%i stale) Peers: %i' % (
602                             math.format(att_s),
603                             height,
604                             len(tracker.verified.shares),
605                             len(tracker.shares),
606                             weights.get(my_script, 0)/total_weight*100,
607                             math.format(weights.get(my_script, 0)/total_weight*att_s),
608                             shares,
609                             stale_shares,
610                             len(p2p_node.peers),
611                         ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool_init.DEBUG else '')
612                         fracs = [read_stale_frac(share) for share in itertools.islice(tracker.get_chain_known(current_work.value['best_share_hash']), 120) if read_stale_frac(share) is not None]
613                         if fracs:
614                             med = math.median(fracs)
615                             print 'Median stale proportion:', med
616                             if shares:
617                                 print '    Own:', stale_shares/shares
618                                 if med < .99:
619                                     print '    Own efficiency: %.02f%%' % (100*(1 - stale_shares/shares)/(1 - med),)
620                             
621                             
622             except:
623                 log.err()
624     except:
625         log.err(None, 'Fatal error:')
626     finally:
627         reactor.stop()
628
629 def run():
630     parser = argparse.ArgumentParser(description='p2pool (version %s)' % (p2pool_init.__version__,), fromfile_prefix_chars='@')
631     parser.convert_arg_line_to_args = lambda arg_line: (arg for arg in arg_line.split() if arg.strip())
632     parser.add_argument('--version', action='version', version=p2pool_init.__version__)
633     parser.add_argument('--net',
634         help='use specified network (choices: bitcoin (default), namecoin, ixcoin)',
635         action='store', choices=set(['bitcoin', 'namecoin', 'ixcoin']), default='bitcoin', dest='net_name')
636     parser.add_argument('--testnet',
637         help='use the testnet',
638         action='store_const', const=True, default=False, dest='testnet')
639     parser.add_argument('--debug',
640         help='debugging mode',
641         action='store_const', const=True, default=False, dest='debug')
642     parser.add_argument('-a', '--address',
643         help='generate to this address (defaults to requesting one from bitcoind)',
644         type=str, action='store', default=None, dest='address')
645     parser.add_argument('--charts',
646         help='generate charts on the web interface (requires PIL and pygame)',
647         action='store_const', const=True, default=False, dest='charts')
648     
649     p2pool_group = parser.add_argument_group('p2pool interface')
650     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
651         help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
652         type=int, action='store', default=None, dest='p2pool_port')
653     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
654         help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
655         type=str, action='append', default=[], dest='p2pool_nodes')
656     parser.add_argument('-l', '--low-bandwidth',
657         help='trade lower bandwidth usage for higher latency (reduced efficiency)',
658         action='store_true', default=False, dest='low_bandwidth')
659     parser.add_argument('--disable-upnp',
660         help='''don't attempt to forward port 9333 (19333 for testnet) from the WAN to this computer using UPnP''',
661         action='store_false', default=True, dest='upnp')
662     
663     worker_group = parser.add_argument_group('worker interface')
664     worker_group.add_argument('-w', '--worker-port', metavar='PORT',
665         help='listen on PORT for RPC connections from miners asking for work and providing responses (default: 9332)',
666         type=int, action='store', default=9332, dest='worker_port')
667     
668     bitcoind_group = parser.add_argument_group('bitcoind interface')
669     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
670         help='connect to a bitcoind at this address (default: 127.0.0.1)',
671         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
672     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
673         help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332, 8338 for ixcoin)',
674         type=int, action='store', default=None, dest='bitcoind_rpc_port')
675     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
676         help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 normally. 18333 for testnet)',
677         type=int, action='store', default=None, dest='bitcoind_p2p_port')
678     
679     bitcoind_group.add_argument(metavar='BITCOIND_RPC_USERNAME',
680         help='bitcoind RPC interface username',
681         type=str, action='store', dest='bitcoind_rpc_username')
682     bitcoind_group.add_argument(metavar='BITCOIND_RPC_PASSWORD',
683         help='bitcoind RPC interface password',
684         type=str, action='store', dest='bitcoind_rpc_password')
685     
686     args = parser.parse_args()
687     
688     if args.debug:
689         p2pool_init.DEBUG = True
690         class ReopeningFile(object):
691             def __init__(self, *open_args, **open_kwargs):
692                 self.open_args, self.open_kwargs = open_args, open_kwargs
693                 self.inner_file = open(*self.open_args, **self.open_kwargs)
694             def reopen(self):
695                 self.inner_file.close()
696                 self.inner_file = open(*self.open_args, **self.open_kwargs)
697             def write(self, data):
698                 self.inner_file.write(data)
699             def flush(self):
700                 self.inner_file.flush()
701         class TeePipe(object):
702             def __init__(self, outputs):
703                 self.outputs = outputs
704             def write(self, data):
705                 for output in self.outputs:
706                     output.write(data)
707             def flush(self):
708                 for output in self.outputs:
709                     output.flush()
710         class TimestampingPipe(object):
711             def __init__(self, inner_file):
712                 self.inner_file = inner_file
713                 self.buf = ''
714                 self.softspace = 0
715             def write(self, data):
716                 buf = self.buf + data
717                 lines = buf.split('\n')
718                 for line in lines[:-1]:
719                     self.inner_file.write('%s %s\n' % (datetime.datetime.now().strftime("%H:%M:%S.%f"), line))
720                     self.inner_file.flush()
721                 self.buf = lines[-1]
722             def flush(self):
723                 pass
724         logfile = ReopeningFile(os.path.join(os.path.dirname(sys.argv[0]), 'debug.log'), 'w')
725         sys.stdout = sys.stderr = log.DefaultObserver.stderr = TimestampingPipe(TeePipe([sys.stderr, logfile]))
726         if hasattr(signal, "SIGUSR1"):
727             def sigusr1(signum, frame):
728                 print '''Caught SIGUSR1, closing 'debug.log'...'''
729                 logfile.reopen()
730                 print '''...and reopened 'debug.log' after catching SIGUSR1.'''
731             signal.signal(signal.SIGUSR1, sigusr1)
732     
733     args.net = {
734         ('bitcoin', False): p2pool.Mainnet,
735         ('bitcoin', True): p2pool.Testnet,
736         ('namecoin', False): p2pool.NamecoinMainnet,
737         ('namecoin', True): p2pool.NamecoinTestnet,
738         ('ixcoin', False): p2pool.IxcoinMainnet,
739         ('ixcoin', True): p2pool.IxcoinTestnet,
740     }[args.net_name, args.testnet]
741     
742     if args.bitcoind_rpc_port is None:
743         args.bitcoind_rpc_port = args.net.BITCOIN_RPC_PORT
744     
745     if args.bitcoind_p2p_port is None:
746         args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
747     
748     if args.p2pool_port is None:
749         args.p2pool_port = args.net.P2P_PORT
750     
751     if args.address is not None:
752         try:
753             args.pubkey_hash = bitcoin.data.address_to_pubkey_hash(args.address, args.net)
754         except Exception, e:
755             raise ValueError('error parsing address: ' + repr(e))
756     else:
757         args.pubkey_hash = None
758     
759     reactor.callWhenRunning(main, args)
760     reactor.run()