automatically generate choices for --net option
[p2pool.git] / p2pool / main.py
1 #!/usr/bin/python
2
3 from __future__ import division
4
5 import argparse
6 import datetime
7 import itertools
8 import os
9 import random
10 import sqlite3
11 import struct
12 import sys
13 import time
14 import json
15 import signal
16 import traceback
17
18 from twisted.internet import defer, reactor, task, threads
19 from twisted.web import server, resource
20 from twisted.python import log
21 from nattraverso import portmapper, ipdiscover
22
23 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
24 from util import db, expiring_dict, jsonrpc, variable, deferral, math
25 from . import p2p, worker_interface, skiplists
26 import p2pool.data as p2pool
27 import p2pool as p2pool_init
28
29 @deferral.retry('Error getting work from bitcoind:', 3)
30 @defer.inlineCallbacks
31 def getwork(bitcoind, ht):
32     # a block could arrive in between these two queries
33     work = bitcoin.getwork.BlockAttempt.from_getwork((yield bitcoind.rpc_getwork()))
34     try:
35         height = ht.getHeight(work.previous_block)
36     except ValueError:
37         height = 1000 # XXX
38     defer.returnValue((work, height))
39
40 @deferral.retry('Error getting payout script from bitcoind:', 1)
41 @defer.inlineCallbacks
42 def get_payout_script(factory):
43     res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
44     if res['reply'] == 'success':
45         defer.returnValue(res['script'])
46     elif res['reply'] == 'denied':
47         defer.returnValue(None)
48     else:
49         raise ValueError('Unexpected reply: %r' % (res,))
50
51 @deferral.retry('Error creating payout script:', 10)
52 @defer.inlineCallbacks
53 def get_payout_script2(bitcoind, net):
54     defer.returnValue(bitcoin.data.pubkey_hash_to_script2(bitcoin.data.address_to_pubkey_hash((yield bitcoind.rpc_getaccountaddress('p2pool')), net)))
55
56 @defer.inlineCallbacks
57 def main(args):
58     try:
59         if args.charts:
60             from . import draw
61         
62         print 'p2pool (version %s)' % (p2pool_init.__version__,)
63         print
64         
65         # connect to bitcoind over JSON-RPC and do initial getwork
66         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
67         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
68         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
69         good = yield deferral.retry('Error while checking bitcoind identity:', 1)(args.net.BITCOIN_RPC_CHECK)(bitcoind)
70         if not good:
71             print "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
72             return
73         temp_work = yield deferral.retry('Error while testing getwork:', 1)(defer.inlineCallbacks(lambda: defer.returnValue(bitcoin.getwork.BlockAttempt.from_getwork((yield bitcoind.rpc_getwork())))))()
74         print '    ...success!'
75         print '    Current block hash: %x' % (temp_work.previous_block,)
76         print
77         
78         # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
79         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
80         factory = bitcoin.p2p.ClientFactory(args.net)
81         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
82         my_script = yield get_payout_script(factory)
83         if args.pubkey_hash is None:
84             if my_script is None:
85                 print '    IP transaction denied ... falling back to sending to address.'
86                 my_script = yield get_payout_script2(bitcoind, args.net)
87         else:
88             my_script = bitcoin.data.pubkey_hash_to_script2(args.pubkey_hash)
89         print '    ...success!'
90         print '    Payout script:', my_script.encode('hex')
91         print
92         
93         print 'Loading cached block headers...'
94         ht = bitcoin.p2p.HeightTracker(factory, args.net.NAME + '_headers.dat')
95         print '   ...done loading %i cached block headers.' % (len(ht.tracker.shares),)
96         print
97         
98         tracker = p2pool.OkayTracker(args.net)
99         ss = p2pool.ShareStore(os.path.join(os.path.dirname(sys.argv[0]), args.net.NAME + '_shares.'), args.net)
100         known_verified = set()
101         print "Loading shares..."
102         for i, (mode, contents) in enumerate(ss.get_shares()):
103             if mode == 'share':
104                 if contents.hash in tracker.shares:
105                     continue
106                 contents.shared = True
107                 contents.stored = True
108                 tracker.add(contents)
109                 if len(tracker.shares) % 1000 == 0 and tracker.shares:
110                     print "    %i" % (len(tracker.shares),)
111             elif mode == 'verified_hash':
112                 known_verified.add(contents)
113             else:
114                 raise AssertionError()
115         print "    ...inserting %i verified shares..." % (len(known_verified),)
116         for h in known_verified:
117             if h not in tracker.shares:
118                 continue
119             tracker.verified.add(tracker.shares[h])
120         print "    ...done loading %i shares!" % (len(tracker.shares),)
121         print
122         tracker.added.watch(lambda share: ss.add_share(share))
123         tracker.verified.added.watch(lambda share: ss.add_verified_hash(share.hash))
124         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
125         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
126         
127         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
128         
129         # information affecting work that should trigger a long-polling update
130         current_work = variable.Variable(None)
131         # information affecting work that should not trigger a long-polling update
132         current_work2 = variable.Variable(None)
133         
134         work_updated = variable.Event()
135         
136         requested = expiring_dict.ExpiringDict(300)
137         
138         @defer.inlineCallbacks
139         def set_real_work1():
140             work, height = yield getwork(bitcoind, ht)
141             changed = work.previous_block != current_work.value['previous_block'] if current_work.value is not None else True
142             current_work.set(dict(
143                 version=work.version,
144                 previous_block=work.previous_block,
145                 target=work.target,
146                 height=height,
147                 best_share_hash=current_work.value['best_share_hash'] if current_work.value is not None else None,
148             ))
149             current_work2.set(dict(
150                 clock_offset=time.time() - work.timestamp,
151                 last_update=time.time(),
152             ))
153             if changed:
154                 set_real_work2()
155         
156         def set_real_work2():
157             best, desired = tracker.think(ht, current_work.value['previous_block'], time.time() - current_work2.value['clock_offset'])
158             
159             t = dict(current_work.value)
160             t['best_share_hash'] = best
161             current_work.set(t)
162             
163             t = time.time()
164             for peer2, share_hash in desired:
165                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
166                     continue
167                 last_request_time, count = requested.get(share_hash, (None, 0))
168                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
169                     continue
170                 potential_peers = set()
171                 for head in tracker.tails[share_hash]:
172                     potential_peers.update(peer_heads.get(head, set()))
173                 potential_peers = [peer for peer in potential_peers if peer.connected2]
174                 if count == 0 and peer2 is not None and peer2.connected2:
175                     peer = peer2
176                 else:
177                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
178                     if peer is None:
179                         continue
180                 
181                 print 'Requesting parent share %s from %s' % (p2pool.format_hash(share_hash), '%s:%i' % peer.addr)
182                 peer.send_getshares(
183                     hashes=[share_hash],
184                     parents=2000,
185                     stops=list(set(tracker.heads) | set(
186                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
187                     ))[:100],
188                 )
189                 requested[share_hash] = t, count + 1
190         
191         print 'Initializing work...'
192         yield set_real_work1()
193         set_real_work2()
194         print '    ...success!'
195         print
196         
197         start_time = time.time() - current_work2.value['clock_offset']
198         
199         # setup p2p logic and join p2pool network
200         
201         def share_share(share, ignore_peer=None):
202             for peer in p2p_node.peers.itervalues():
203                 if peer is ignore_peer:
204                     continue
205                 #if p2pool_init.DEBUG:
206                 #    print "Sending share %s to %r" % (p2pool.format_hash(share.hash), peer.addr)
207                 peer.send_shares([share])
208             share.flag_shared()
209         
210         def p2p_shares(shares, peer=None):
211             if len(shares) > 5:
212                 print 'Processing %i shares...' % (len(shares),)
213             
214             some_new = False
215             for share in shares:
216                 if share.hash in tracker.shares:
217                     #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool.format_hash(share.hash),)
218                     continue
219                 some_new = True
220                 
221                 #print 'Received share %s from %r' % (p2pool.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
222                 
223                 tracker.add(share)
224                 #for peer2, share_hash in desired:
225                 #    print 'Requesting parent share %x' % (share_hash,)
226                 #    peer2.send_getshares(hashes=[share_hash], parents=2000)
227                 
228                 if share.bitcoin_hash <= share.header['target']:
229                     print
230                     print 'GOT BLOCK! Passing to bitcoind! %s bitcoin: %x' % (p2pool.format_hash(share.hash), share.bitcoin_hash,)
231                     print
232                     if factory.conn.value is not None:
233                         factory.conn.value.send_block(block=share.as_block(tracker, args.net))
234                     else:
235                         print 'No bitcoind connection! Erp!'
236             
237             if shares and peer is not None:
238                 peer_heads.setdefault(shares[0].hash, set()).add(peer)
239             
240             if some_new:
241                 set_real_work2()
242             
243             if len(shares) > 5:
244                 print '... done processing %i shares. Have: %i/~%i' % (len(shares), len(tracker.shares), 2*args.net.CHAIN_LENGTH)
245         
246         def p2p_share_hashes(share_hashes, peer):
247             t = time.time()
248             get_hashes = []
249             for share_hash in share_hashes:
250                 if share_hash in tracker.shares:
251                     continue
252                 last_request_time, count = requested.get(share_hash, (None, 0))
253                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
254                     continue
255                 print 'Got share hash, requesting! Hash: %s' % (p2pool.format_hash(share_hash),)
256                 get_hashes.append(share_hash)
257                 requested[share_hash] = t, count + 1
258             
259             if share_hashes and peer is not None:
260                 peer_heads.setdefault(share_hashes[0], set()).add(peer)
261             if get_hashes:
262                 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
263         
264         def p2p_get_shares(share_hashes, parents, stops, peer):
265             parents = min(parents, 1000//len(share_hashes))
266             stops = set(stops)
267             shares = []
268             for share_hash in share_hashes:
269                 for share in itertools.islice(tracker.get_chain_known(share_hash), parents + 1):
270                     if share.hash in stops:
271                         break
272                     shares.append(share)
273             print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
274             peer.send_shares(shares, full=True)
275         
276         print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
277         
278         def parse(x):
279             if ':' in x:
280                 ip, port = x.split(':')
281                 return ip, int(port)
282             else:
283                 return x, args.net.P2P_PORT
284         
285         nodes = set([
286             ('72.14.191.28', args.net.P2P_PORT),
287             ('62.204.197.159', args.net.P2P_PORT),
288             ('142.58.248.28', args.net.P2P_PORT),
289             ('94.23.34.145', args.net.P2P_PORT),
290         ])
291         for host in [
292             'p2pool.forre.st',
293             'dabuttonfactory.com',
294         ]:
295             try:
296                 nodes.add(((yield reactor.resolve(host)), args.net.P2P_PORT))
297             except:
298                 log.err(None, 'Error resolving bootstrap node IP:')
299         
300         p2p_node = p2p.Node(
301             current_work=current_work,
302             port=args.p2pool_port,
303             net=args.net,
304             addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.NAME),
305             mode=0 if args.low_bandwidth else 1,
306             preferred_addrs=set(map(parse, args.p2pool_nodes)) | nodes,
307         )
308         p2p_node.handle_shares = p2p_shares
309         p2p_node.handle_share_hashes = p2p_share_hashes
310         p2p_node.handle_get_shares = p2p_get_shares
311         
312         p2p_node.start()
313         
314         # send share when the chain changes to their chain
315         def work_changed(new_work):
316             #print 'Work changed:', new_work
317             for share in tracker.get_chain_known(new_work['best_share_hash']):
318                 if share.shared:
319                     break
320                 share_share(share, share.peer)
321         current_work.changed.watch(work_changed)
322         
323         print '    ...success!'
324         print
325         
326         @defer.inlineCallbacks
327         def upnp_thread():
328             while True:
329                 try:
330                     is_lan, lan_ip = yield ipdiscover.get_local_ip()
331                     if is_lan:
332                         pm = yield portmapper.get_port_mapper()
333                         yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP') # XXX try to forward external correct port?
334                 except:
335                     if p2pool_init.DEBUG:
336                         log.err(None, "UPnP error:")
337                 yield deferral.sleep(random.expovariate(1/120))
338         
339         if args.upnp:
340             upnp_thread()
341         
342         # start listening for workers with a JSON-RPC server
343         
344         print 'Listening for workers on port %i...' % (args.worker_port,)
345         
346         # setup worker logic
347         
348         merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
349         run_identifier = struct.pack('<Q', random.randrange(2**64))
350         
351         share_counter = skiplists.CountsSkipList(tracker, run_identifier)
352         removed_unstales = set()
353         def get_share_counts():
354             height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
355             matching_in_chain = share_counter(current_work.value['best_share_hash'], height) | removed_unstales
356             shares_in_chain = my_shares & matching_in_chain
357             stale_shares = my_shares - matching_in_chain
358             return len(shares_in_chain) + len(stale_shares), len(stale_shares)
359         @tracker.verified.removed.watch
360         def _(share):
361             if share.hash in my_shares and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
362                 removed_unstales.add(share.hash)
363         
364         def compute(state, payout_script):
365             if payout_script is None:
366                 payout_script = my_script
367             if state['best_share_hash'] is None and args.net.PERSIST:
368                 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
369             if len(p2p_node.peers) == 0 and args.net.PERSIST:
370                 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
371             if time.time() > current_work2.value['last_update'] + 60:
372                 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
373             pre_extra_txs = [tx for tx in tx_pool.itervalues() if tx.is_good()]
374             pre_extra_txs = pre_extra_txs[:2**16 - 1] # merkle_branch limit
375             extra_txs = []
376             size = 0
377             for tx in pre_extra_txs:
378                 this_size = len(bitcoin.data.tx_type.pack(tx.tx))
379                 if size + this_size > 500000:
380                     break
381                 extra_txs.append(tx)
382                 size += this_size
383             # XXX check sigops!
384             # XXX assuming generate_tx is smallish here..
385             def get_stale_frac():
386                 shares, stale_shares = get_share_counts()
387                 if shares == 0:
388                     return ""
389                 frac = stale_shares/shares
390                 return 2*struct.pack('<H', int(65535*frac + .5))
391             generate_tx = p2pool.generate_transaction(
392                 tracker=tracker,
393                 previous_share_hash=state['best_share_hash'],
394                 new_script=payout_script,
395                 subsidy=args.net.BITCOIN_SUBSIDY_FUNC(state['height']) + sum(tx.value_in - tx.value_out for tx in extra_txs),
396                 nonce=run_identifier + struct.pack('<Q', random.randrange(2**64)) + get_stale_frac(),
397                 block_target=state['target'],
398                 net=args.net,
399             )
400             print 'New work for worker! Difficulty: %.06f Payout if block: %.6f %s' % (0xffff*2**208/p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'], generate_tx['tx_outs'][-1]['value']*1e-8, args.net.BITCOIN_SYMBOL)
401             #print 'Target: %x' % (p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'],)
402             #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
403             transactions = [generate_tx] + [tx.tx for tx in extra_txs]
404             merkle_root = bitcoin.data.merkle_hash(transactions)
405             merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
406             
407             timestamp = int(time.time() - current_work2.value['clock_offset'])
408             if state['best_share_hash'] is not None:
409                 timestamp2 = math.median((s.timestamp for s in itertools.islice(tracker.get_chain_to_root(state['best_share_hash']), 11)), use_float=False) + 1
410                 if timestamp2 > timestamp:
411                     print 'Toff', timestamp2 - timestamp
412                     timestamp = timestamp2
413             target2 = p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
414             times[p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['nonce']] = time.time()
415             #print 'SENT', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
416             return bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'], target2)
417         
418         my_shares = set()
419         times = {}
420         
421         def got_response(data, user):
422             try:
423                 # match up with transactions
424                 header = bitcoin.getwork.decode_data(data)
425                 transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
426                 if transactions is None:
427                     print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
428                     return False
429                 block = dict(header=header, txs=transactions)
430                 hash_ = bitcoin.data.block_header_type.hash256(block['header'])
431                 if hash_ <= block['header']['target'] or p2pool_init.DEBUG:
432                     if factory.conn.value is not None:
433                         factory.conn.value.send_block(block=block)
434                     else:
435                         print 'No bitcoind connection! Erp!'
436                     if hash_ <= block['header']['target']:
437                         print
438                         print 'GOT BLOCK! Passing to bitcoind! bitcoin: %x' % (hash_,)
439                         print
440                 target = p2pool.coinbase_type.unpack(transactions[0]['tx_ins'][0]['script'])['share_data']['target']
441                 if hash_ > target:
442                     print 'Worker submitted share with hash > target:\nhash  : %x\ntarget: %x' % (hash_, target)
443                     return False
444                 share = p2pool.Share.from_block(block)
445                 my_shares.add(share.hash)
446                 print 'GOT SHARE! %s %s prev %s age %.2fs' % (user, p2pool.format_hash(share.hash), p2pool.format_hash(share.previous_hash), time.time() - times[share.nonce]) + (' DEAD ON ARRIVAL' if share.previous_hash != current_work.value['best_share_hash'] else '')
447                 good = share.previous_hash == current_work.value['best_share_hash']
448                 # maybe revert back to tracker being non-blocking so 'good' can be more accurate?
449                 p2p_shares([share])
450                 # eg. good = share.hash == current_work.value['best_share_hash'] here
451                 return good
452             except:
453                 log.err(None, 'Error processing data received from worker:')
454                 return False
455         
456         web_root = worker_interface.WorkerInterface(current_work, compute, got_response, args.net)
457         
458         def get_rate():
459             if current_work.value['best_share_hash'] is not None:
460                 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
461                 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net, min(height, 720))
462                 return json.dumps(att_s)
463             return json.dumps(None)
464         
465         def get_users():
466             height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
467             weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 2**256)
468             res = {}
469             for script in sorted(weights, key=lambda s: weights[s]):
470                 res[bitcoin.data.script2_to_human(script, args.net)] = weights[script]/total_weight
471             return json.dumps(res)
472         
473         class WebInterface(resource.Resource):
474             def __init__(self, func, mime_type):
475                 self.func, self.mime_type = func, mime_type
476             
477             def render_GET(self, request):
478                 request.setHeader('Content-Type', self.mime_type)
479                 return self.func()
480         
481         web_root.putChild('rate', WebInterface(get_rate, 'application/json'))
482         web_root.putChild('users', WebInterface(get_users, 'application/json'))
483         if args.charts:
484             web_root.putChild('chain_img', WebInterface(lambda: draw.get(tracker, current_work.value['best_share_hash']), 'image/png'))
485         
486         reactor.listenTCP(args.worker_port, server.Site(web_root))
487         
488         print '    ...success!'
489         print
490         
491         # done!
492         
493         tx_pool = expiring_dict.ExpiringDict(600, get_touches=False) # hash -> tx
494         get_raw_transaction = deferral.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
495         
496         class Tx(object):
497             def __init__(self, tx, seen_at_block):
498                 self.hash = bitcoin.data.tx_type.hash256(tx)
499                 self.tx = tx
500                 self.seen_at_block = seen_at_block
501                 self.mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
502                 #print
503                 #print '%x %r' % (seen_at_block, tx)
504                 #for mention in self.mentions:
505                 #    print '%x' % mention
506                 #print
507                 self.parents_all_in_blocks = False
508                 self.value_in = 0
509                 #print self.tx
510                 self.value_out = sum(txout['value'] for txout in self.tx['tx_outs'])
511                 self._find_parents_in_blocks()
512             
513             @defer.inlineCallbacks
514             def _find_parents_in_blocks(self):
515                 for tx_in in self.tx['tx_ins']:
516                     try:
517                         raw_transaction = yield get_raw_transaction(tx_in['previous_output']['hash'])
518                     except Exception:
519                         return
520                     self.value_in += raw_transaction['tx']['txouts'][tx_in['previous_output']['index']]['value']
521                     #print raw_transaction
522                     if not raw_transaction['parent_blocks']:
523                         return
524                 self.parents_all_in_blocks = True
525             
526             def is_good(self):
527                 if not self.parents_all_in_blocks:
528                     return False
529                 x = self.is_good2()
530                 #print 'is_good:', x
531                 return x
532         
533         @defer.inlineCallbacks
534         def new_tx(tx_hash):
535             try:
536                 assert isinstance(tx_hash, (int, long))
537                 #print 'REQUESTING', tx_hash
538                 tx = yield (yield factory.getProtocol()).get_tx(tx_hash)
539                 #print 'GOT', tx
540                 tx_pool[bitcoin.data.tx_type.hash256(tx)] = Tx(tx, current_work.value['previous_block'])
541             except:
542                 log.err(None, 'Error handling tx:')
543         # disable for now, for testing impact on stales
544         #factory.new_tx.watch(new_tx)
545         
546         def new_block(block_hash):
547             work_updated.happened()
548         factory.new_block.watch(new_block)
549         
550         print 'Started successfully!'
551         print
552         
553         ht.updated.watch(set_real_work2)
554         
555         @defer.inlineCallbacks
556         def work1_thread():
557             while True:
558                 flag = work_updated.get_deferred()
559                 try:
560                     yield set_real_work1()
561                 except:
562                     log.err()
563                 yield defer.DeferredList([flag, deferral.sleep(random.uniform(1, 10))], fireOnOneCallback=True)
564         
565         @defer.inlineCallbacks
566         def work2_thread():
567             while True:
568                 try:
569                     set_real_work2()
570                 except:
571                     log.err()
572                 yield deferral.sleep(random.expovariate(1/20))
573         
574         work1_thread()
575         work2_thread()
576         
577         
578         if hasattr(signal, 'SIGALRM'):
579             def watchdog_handler(signum, frame):
580                 print 'Watchdog timer went off at:'
581                 traceback.print_stack()
582             
583             signal.signal(signal.SIGALRM, watchdog_handler)
584             task.LoopingCall(signal.alarm, 30).start(1)
585         
586         
587         def read_stale_frac(share):
588             if len(share.nonce) != 20:
589                 return None
590             a, b = struct.unpack("<HH", share.nonce[-4:])
591             if a != b:
592                 return None
593             return a/65535
594         
595         while True:
596             yield deferral.sleep(3)
597             try:
598                 if time.time() > current_work2.value['last_update'] + 60:
599                     print '''LOST CONTACT WITH BITCOIND for 60 seconds, check that it isn't frozen or dead'''
600                 if current_work.value['best_share_hash'] is not None:
601                     height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
602                     if height > 2:
603                         att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net, min(height - 1, 120))
604                         weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 120), 2**100)
605                         shares, stale_shares = get_share_counts()
606                         print 'Pool: %sH/s in %i shares (%i/%i verified) Recent: %.02f%% >%sH/s Shares: %i (%i stale) Peers: %i' % (
607                             math.format(att_s),
608                             height,
609                             len(tracker.verified.shares),
610                             len(tracker.shares),
611                             weights.get(my_script, 0)/total_weight*100,
612                             math.format(weights.get(my_script, 0)/total_weight*att_s),
613                             shares,
614                             stale_shares,
615                             len(p2p_node.peers),
616                         ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool_init.DEBUG else '')
617                         fracs = [read_stale_frac(share) for share in itertools.islice(tracker.get_chain_known(current_work.value['best_share_hash']), 120) if read_stale_frac(share) is not None]
618                         if fracs:
619                             med = math.median(fracs)
620                             print 'Median stale proportion:', med
621                             if shares:
622                                 print '    Own:', stale_shares/shares
623                                 if med < .99:
624                                     print '    Own efficiency: %.02f%%' % (100*(1 - stale_shares/shares)/(1 - med),)
625                             
626                             
627             except:
628                 log.err()
629     except:
630         log.err(None, 'Fatal error:')
631     finally:
632         reactor.stop()
633
634 def run():
635     parser = argparse.ArgumentParser(description='p2pool (version %s)' % (p2pool_init.__version__,), fromfile_prefix_chars='@')
636     parser.convert_arg_line_to_args = lambda arg_line: (arg for arg in arg_line.split() if arg.strip())
637     parser.add_argument('--version', action='version', version=p2pool_init.__version__)
638     parser.add_argument('--net',
639         help='use specified network (default: bitcoin)',
640         action='store', choices=sorted(x for x in p2pool.nets if 'testnet' not in x), default='bitcoin', dest='net_name')
641     parser.add_argument('--testnet',
642         help='''use the network's testnet''',
643         action='store_const', const=True, default=False, dest='testnet')
644     parser.add_argument('--debug',
645         help='debugging mode',
646         action='store_const', const=True, default=False, dest='debug')
647     parser.add_argument('-a', '--address',
648         help='generate to this address (defaults to requesting one from bitcoind)',
649         type=str, action='store', default=None, dest='address')
650     parser.add_argument('--charts',
651         help='generate charts on the web interface (requires PIL and pygame)',
652         action='store_const', const=True, default=False, dest='charts')
653     parser.add_argument('--logfile',
654         help='log to specific file (defaults to <network_name>.log)',
655         type=str, action='store', default=None, dest='logfile')
656     
657     p2pool_group = parser.add_argument_group('p2pool interface')
658     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
659         help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
660         type=int, action='store', default=None, dest='p2pool_port')
661     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
662         help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
663         type=str, action='append', default=[], dest='p2pool_nodes')
664     parser.add_argument('-l', '--low-bandwidth',
665         help='trade lower bandwidth usage for higher latency (reduced efficiency)',
666         action='store_true', default=False, dest='low_bandwidth')
667     parser.add_argument('--disable-upnp',
668         help='''don't attempt to forward port 9333 (19333 for testnet) from the WAN to this computer using UPnP''',
669         action='store_false', default=True, dest='upnp')
670     
671     worker_group = parser.add_argument_group('worker interface')
672     worker_group.add_argument('-w', '--worker-port', metavar='PORT',
673         help='listen on PORT for RPC connections from miners asking for work and providing responses (default: bitcoin: 9332 namecoin: 9331 ixcoin: 9330 i0coin: 9329, +10000 for testnets)',
674         type=int, action='store', default=None, dest='worker_port')
675     
676     bitcoind_group = parser.add_argument_group('bitcoind interface')
677     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
678         help='connect to a bitcoind at this address (default: 127.0.0.1)',
679         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
680     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
681         help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332, 8338 for ixcoin)',
682         type=int, action='store', default=None, dest='bitcoind_rpc_port')
683     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
684         help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 normally. 18333 for testnet)',
685         type=int, action='store', default=None, dest='bitcoind_p2p_port')
686     
687     bitcoind_group.add_argument(metavar='BITCOIND_RPC_USERNAME',
688         help='bitcoind RPC interface username',
689         type=str, action='store', dest='bitcoind_rpc_username')
690     bitcoind_group.add_argument(metavar='BITCOIND_RPC_PASSWORD',
691         help='bitcoind RPC interface password',
692         type=str, action='store', dest='bitcoind_rpc_password')
693     
694     args = parser.parse_args()
695     
696     if args.debug:
697         p2pool_init.DEBUG = True
698     
699     if args.logfile is None:
700        args.logfile = args.net_name + ('_testnet' if args.testnet else '') + '.log'
701     
702     class ReopeningFile(object):
703         def __init__(self, *open_args, **open_kwargs):
704             self.open_args, self.open_kwargs = open_args, open_kwargs
705             self.inner_file = open(*self.open_args, **self.open_kwargs)
706         def reopen(self):
707             self.inner_file.close()
708             self.inner_file = open(*self.open_args, **self.open_kwargs)
709         def write(self, data):
710             self.inner_file.write(data)
711         def flush(self):
712             self.inner_file.flush()
713     class TeePipe(object):
714         def __init__(self, outputs):
715             self.outputs = outputs
716         def write(self, data):
717             for output in self.outputs:
718                 output.write(data)
719         def flush(self):
720             for output in self.outputs:
721                 output.flush()
722     class TimestampingPipe(object):
723         def __init__(self, inner_file):
724             self.inner_file = inner_file
725             self.buf = ''
726             self.softspace = 0
727         def write(self, data):
728             buf = self.buf + data
729             lines = buf.split('\n')
730             for line in lines[:-1]:
731                 self.inner_file.write('%s %s\n' % (datetime.datetime.now().strftime("%H:%M:%S.%f"), line))
732                 self.inner_file.flush()
733             self.buf = lines[-1]
734         def flush(self):
735             pass
736     logfile = ReopeningFile(os.path.join(os.path.dirname(sys.argv[0]), 'debug.log'), 'w')
737     sys.stdout = sys.stderr = log.DefaultObserver.stderr = TimestampingPipe(TeePipe([sys.stderr, logfile]))
738     if hasattr(signal, "SIGUSR1"):
739         def sigusr1(signum, frame):
740             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
741             logfile.reopen()
742             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
743         signal.signal(signal.SIGUSR1, sigusr1)
744     
745     args.net = p2pool.nets[args.net_name + ('_testnet' if args.testnet else '')]
746     
747     if args.bitcoind_rpc_port is None:
748         args.bitcoind_rpc_port = args.net.BITCOIN_RPC_PORT
749     
750     if args.bitcoind_p2p_port is None:
751         args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
752     
753     if args.p2pool_port is None:
754         args.p2pool_port = args.net.P2P_PORT
755     
756     if args.worker_port is None:
757         args.worker_port = args.net.WORKER_PORT
758     
759     if args.address is not None:
760         try:
761             args.pubkey_hash = bitcoin.data.address_to_pubkey_hash(args.address, args.net)
762         except Exception, e:
763             raise ValueError('error parsing address: ' + repr(e))
764     else:
765         args.pubkey_hash = None
766     
767     reactor.callWhenRunning(main, args)
768     reactor.run()