unify naming of different nets and files
[p2pool.git] / p2pool / main.py
1 #!/usr/bin/python
2
3 from __future__ import division
4
5 import argparse
6 import datetime
7 import itertools
8 import os
9 import random
10 import sqlite3
11 import struct
12 import sys
13 import time
14 import json
15 import signal
16 import traceback
17
18 from twisted.internet import defer, reactor, task, threads
19 from twisted.web import server, resource
20 from twisted.python import log
21 from nattraverso import portmapper, ipdiscover
22
23 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
24 from util import db, expiring_dict, jsonrpc, variable, deferral, math
25 from . import p2p, worker_interface, skiplists
26 import p2pool.data as p2pool
27 import p2pool as p2pool_init
28
29 @deferral.retry('Error getting work from bitcoind:', 3)
30 @defer.inlineCallbacks
31 def getwork(bitcoind, ht):
32     # a block could arrive in between these two queries
33     work = bitcoin.getwork.BlockAttempt.from_getwork((yield bitcoind.rpc_getwork()))
34     try:
35         height = ht.getHeight(work.previous_block)
36     except ValueError:
37         height = 1000 # XXX
38     defer.returnValue((work, height))
39
40 @deferral.retry('Error getting payout script from bitcoind:', 1)
41 @defer.inlineCallbacks
42 def get_payout_script(factory):
43     res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
44     if res['reply'] == 'success':
45         defer.returnValue(res['script'])
46     elif res['reply'] == 'denied':
47         defer.returnValue(None)
48     else:
49         raise ValueError('Unexpected reply: %r' % (res,))
50
51 @deferral.retry('Error creating payout script:', 10)
52 @defer.inlineCallbacks
53 def get_payout_script2(bitcoind, net):
54     defer.returnValue(bitcoin.data.pubkey_hash_to_script2(bitcoin.data.address_to_pubkey_hash((yield bitcoind.rpc_getaccountaddress('p2pool')), net)))
55
56 @defer.inlineCallbacks
57 def main(args):
58     try:
59         if args.charts:
60             from . import draw
61         
62         print 'p2pool (version %s)' % (p2pool_init.__version__,)
63         print
64         
65         # connect to bitcoind over JSON-RPC and do initial getwork
66         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
67         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
68         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
69         good = yield args.net.BITCOIN_RPC_CHECK(bitcoind)
70         if not good:
71             print "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
72             return
73         temp_work = bitcoin.getwork.BlockAttempt.from_getwork((yield bitcoind.rpc_getwork()))
74         print '    ...success!'
75         print '    Current block hash: %x' % (temp_work.previous_block,)
76         print
77         
78         # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
79         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
80         factory = bitcoin.p2p.ClientFactory(args.net)
81         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
82         my_script = yield get_payout_script(factory)
83         if args.pubkey_hash is None:
84             if my_script is None:
85                 print '    IP transaction denied ... falling back to sending to address.'
86                 my_script = yield get_payout_script2(bitcoind, args.net)
87         else:
88             my_script = bitcoin.data.pubkey_hash_to_script2(args.pubkey_hash)
89         print '    ...success!'
90         print '    Payout script:', my_script.encode('hex')
91         print
92         
93         print 'Loading cached block headers...'
94         ht = bitcoin.p2p.HeightTracker(factory, args.net.NAME + '_headers.dat')
95         print '   ...done loading %i cached block headers.' % (len(ht.tracker.shares),)
96         print
97         
98         tracker = p2pool.OkayTracker(args.net)
99         ss = p2pool.ShareStore(os.path.join(os.path.dirname(sys.argv[0]), args.net.NAME + '_shares.'), args.net)
100         known_verified = set()
101         print "Loading shares..."
102         for i, (mode, contents) in enumerate(ss.get_shares()):
103             if mode == 'share':
104                 if contents.hash in tracker.shares:
105                     continue
106                 contents.shared = True
107                 contents.stored = True
108                 tracker.add(contents)
109                 if len(tracker.shares) % 1000 == 0 and tracker.shares:
110                     print "    %i" % (len(tracker.shares),)
111             elif mode == 'verified_hash':
112                 known_verified.add(contents)
113             else:
114                 raise AssertionError()
115         print "    ...inserting %i verified shares..." % (len(known_verified),)
116         for h in known_verified:
117             if h not in tracker.shares:
118                 continue
119             tracker.verified.add(tracker.shares[h])
120         print "    ...done loading %i shares!" % (len(tracker.shares),)
121         print
122         tracker.added.watch(lambda share: threads.deferToThread(ss.add_share, share))
123         tracker.verified.added.watch(lambda share: threads.deferToThread(ss.add_verified_hash, share.hash))
124         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
125         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
126         
127         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
128         
129         # information affecting work that should trigger a long-polling update
130         current_work = variable.Variable(None)
131         # information affecting work that should not trigger a long-polling update
132         current_work2 = variable.Variable(None)
133         
134         work_updated = variable.Event()
135         
136         requested = expiring_dict.ExpiringDict(300)
137         
138         @defer.inlineCallbacks
139         def set_real_work1():
140             work, height = yield getwork(bitcoind, ht)
141             changed = work.previous_block != current_work.value['previous_block'] if current_work.value is not None else True
142             current_work.set(dict(
143                 version=work.version,
144                 previous_block=work.previous_block,
145                 target=work.target,
146                 height=height,
147                 best_share_hash=current_work.value['best_share_hash'] if current_work.value is not None else None,
148             ))
149             current_work2.set(dict(
150                 clock_offset=time.time() - work.timestamp,
151                 last_update=time.time(),
152             ))
153             if changed:
154                 set_real_work2()
155         
156         def set_real_work2():
157             best, desired = tracker.think(ht, current_work.value['previous_block'], time.time() - current_work2.value['clock_offset'])
158             
159             t = dict(current_work.value)
160             t['best_share_hash'] = best
161             current_work.set(t)
162             
163             t = time.time()
164             for peer2, share_hash in desired:
165                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
166                     continue
167                 last_request_time, count = requested.get(share_hash, (None, 0))
168                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
169                     continue
170                 potential_peers = set()
171                 for head in tracker.tails[share_hash]:
172                     potential_peers.update(peer_heads.get(head, set()))
173                 potential_peers = [peer for peer in potential_peers if peer.connected2]
174                 if count == 0 and peer2 is not None and peer2.connected2:
175                     peer = peer2
176                 else:
177                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
178                     if peer is None:
179                         continue
180                 
181                 print 'Requesting parent share %s from %s' % (p2pool.format_hash(share_hash), '%s:%i' % peer.addr)
182                 peer.send_getshares(
183                     hashes=[share_hash],
184                     parents=2000,
185                     stops=list(set(tracker.heads) | set(
186                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
187                     ))[:100],
188                 )
189                 requested[share_hash] = t, count + 1
190         
191         print 'Initializing work...'
192         yield set_real_work1()
193         set_real_work2()
194         print '    ...success!'
195         print
196         
197         start_time = time.time() - current_work2.value['clock_offset']
198         
199         # setup p2p logic and join p2pool network
200         
201         def share_share(share, ignore_peer=None):
202             for peer in p2p_node.peers.itervalues():
203                 if peer is ignore_peer:
204                     continue
205                 #if p2pool_init.DEBUG:
206                 #    print "Sending share %s to %r" % (p2pool.format_hash(share.hash), peer.addr)
207                 peer.send_shares([share])
208             share.flag_shared()
209         
210         def p2p_shares(shares, peer=None):
211             if len(shares) > 5:
212                 print 'Processing %i shares...' % (len(shares),)
213             
214             some_new = False
215             for share in shares:
216                 if share.hash in tracker.shares:
217                     #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool.format_hash(share.hash),)
218                     continue
219                 some_new = True
220                 
221                 #print 'Received share %s from %r' % (p2pool.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
222                 
223                 tracker.add(share)
224                 #for peer2, share_hash in desired:
225                 #    print 'Requesting parent share %x' % (share_hash,)
226                 #    peer2.send_getshares(hashes=[share_hash], parents=2000)
227                 
228                 if share.bitcoin_hash <= share.header['target']:
229                     print
230                     print 'GOT BLOCK! Passing to bitcoind! %s bitcoin: %x' % (p2pool.format_hash(share.hash), share.bitcoin_hash,)
231                     print
232                     if factory.conn.value is not None:
233                         factory.conn.value.send_block(block=share.as_block(tracker, args.net))
234                     else:
235                         print 'No bitcoind connection! Erp!'
236             
237             if shares and peer is not None:
238                 peer_heads.setdefault(shares[0].hash, set()).add(peer)
239             
240             if some_new:
241                 set_real_work2()
242             
243             if len(shares) > 5:
244                 print '... done processing %i shares. Have: %i/~%i' % (len(shares), len(tracker.shares), 2*args.net.CHAIN_LENGTH)
245         
246         def p2p_share_hashes(share_hashes, peer):
247             t = time.time()
248             get_hashes = []
249             for share_hash in share_hashes:
250                 if share_hash in tracker.shares:
251                     continue
252                 last_request_time, count = requested.get(share_hash, (None, 0))
253                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
254                     continue
255                 print 'Got share hash, requesting! Hash: %s' % (p2pool.format_hash(share_hash),)
256                 get_hashes.append(share_hash)
257                 requested[share_hash] = t, count + 1
258             
259             if share_hashes and peer is not None:
260                 peer_heads.setdefault(share_hashes[0], set()).add(peer)
261             if get_hashes:
262                 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
263         
264         def p2p_get_shares(share_hashes, parents, stops, peer):
265             parents = min(parents, 1000//len(share_hashes))
266             stops = set(stops)
267             shares = []
268             for share_hash in share_hashes:
269                 for share in itertools.islice(tracker.get_chain_known(share_hash), parents + 1):
270                     if share.hash in stops:
271                         break
272                     shares.append(share)
273             print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
274             peer.send_shares(shares, full=True)
275         
276         print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
277         
278         def parse(x):
279             if ':' in x:
280                 ip, port = x.split(':')
281                 return ip, int(port)
282             else:
283                 return x, args.net.P2P_PORT
284         
285         nodes = set([
286             ('72.14.191.28', args.net.P2P_PORT),
287             ('62.204.197.159', args.net.P2P_PORT),
288             ('142.58.248.28', args.net.P2P_PORT),
289             ('94.23.34.145', args.net.P2P_PORT),
290         ])
291         for host in [
292             'p2pool.forre.st',
293             'dabuttonfactory.com',
294         ]:
295             try:
296                 nodes.add(((yield reactor.resolve(host)), args.net.P2P_PORT))
297             except:
298                 log.err(None, 'Error resolving bootstrap node IP:')
299         
300         p2p_node = p2p.Node(
301             current_work=current_work,
302             port=args.p2pool_port,
303             net=args.net,
304             addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.NAME),
305             mode=0 if args.low_bandwidth else 1,
306             preferred_addrs=set(map(parse, args.p2pool_nodes)) | nodes,
307         )
308         p2p_node.handle_shares = p2p_shares
309         p2p_node.handle_share_hashes = p2p_share_hashes
310         p2p_node.handle_get_shares = p2p_get_shares
311         
312         p2p_node.start()
313         
314         # send share when the chain changes to their chain
315         def work_changed(new_work):
316             #print 'Work changed:', new_work
317             for share in tracker.get_chain_known(new_work['best_share_hash']):
318                 if share.shared:
319                     break
320                 share_share(share, share.peer)
321         current_work.changed.watch(work_changed)
322         
323         print '    ...success!'
324         print
325         
326         @defer.inlineCallbacks
327         def upnp_thread():
328             while True:
329                 try:
330                     is_lan, lan_ip = yield ipdiscover.get_local_ip()
331                     if is_lan:
332                         pm = yield portmapper.get_port_mapper()
333                         yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP') # XXX try to forward external correct port?
334                 except:
335                     if p2pool_init.DEBUG:
336                         log.err(None, "UPnP error:")
337                 yield deferral.sleep(random.expovariate(1/120))
338         
339         if args.upnp:
340             upnp_thread()
341         
342         # start listening for workers with a JSON-RPC server
343         
344         print 'Listening for workers on port %i...' % (args.worker_port,)
345         
346         # setup worker logic
347         
348         merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
349         run_identifier = struct.pack('<Q', random.randrange(2**64))
350         
351         share_counter = skiplists.CountsSkipList(tracker, run_identifier)
352         removed_unstales = set()
353         def get_share_counts():
354             height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
355             matching_in_chain = share_counter(current_work.value['best_share_hash'], height) | removed_unstales
356             shares_in_chain = my_shares & matching_in_chain
357             stale_shares = my_shares - matching_in_chain
358             return len(shares_in_chain) + len(stale_shares), len(stale_shares)
359         @tracker.verified.removed.watch
360         def _(share):
361             if share.hash in my_shares and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
362                 removed_unstales.add(share.hash)
363         
364         def compute(state, payout_script):
365             if payout_script is None:
366                 payout_script = my_script
367             if state['best_share_hash'] is None and args.net.PERSIST:
368                 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
369             if len(p2p_node.peers) == 0 and args.net.PERSIST:
370                 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
371             if time.time() > current_work2.value['last_update'] + 60:
372                 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
373             pre_extra_txs = [tx for tx in tx_pool.itervalues() if tx.is_good()]
374             pre_extra_txs = pre_extra_txs[:2**16 - 1] # merkle_branch limit
375             extra_txs = []
376             size = 0
377             for tx in pre_extra_txs:
378                 this_size = len(bitcoin.data.tx_type.pack(tx.tx))
379                 if size + this_size > 500000:
380                     break
381                 extra_txs.append(tx)
382                 size += this_size
383             # XXX check sigops!
384             # XXX assuming generate_tx is smallish here..
385             def get_stale_frac():
386                 shares, stale_shares = get_share_counts()
387                 if shares == 0:
388                     return ""
389                 frac = stale_shares/shares
390                 return 2*struct.pack('<H', int(65535*frac + .5))
391             generate_tx = p2pool.generate_transaction(
392                 tracker=tracker,
393                 previous_share_hash=state['best_share_hash'],
394                 new_script=payout_script,
395                 subsidy=args.net.BITCOIN_SUBSIDY_FUNC(state['height']) + sum(tx.value_in - tx.value_out for tx in extra_txs),
396                 nonce=run_identifier + struct.pack('<Q', random.randrange(2**64)) + get_stale_frac(),
397                 block_target=state['target'],
398                 net=args.net,
399             )
400             print 'New work for worker! Difficulty: %.06f Payout if block: %.6f %s' % (0xffff*2**208/p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'], generate_tx['tx_outs'][-1]['value']*1e-8, args.net.BITCOIN_SYMBOL)
401             #print 'Target: %x' % (p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'],)
402             #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
403             transactions = [generate_tx] + [tx.tx for tx in extra_txs]
404             merkle_root = bitcoin.data.merkle_hash(transactions)
405             merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
406             
407             timestamp = int(time.time() - current_work2.value['clock_offset'])
408             if state['best_share_hash'] is not None:
409                 timestamp2 = math.median((s.timestamp for s in itertools.islice(tracker.get_chain_to_root(state['best_share_hash']), 11)), use_float=False) + 1
410                 if timestamp2 > timestamp:
411                     print 'Toff', timestamp2 - timestamp
412                     timestamp = timestamp2
413             target2 = p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
414             times[p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['nonce']] = time.time()
415             #print 'SENT', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
416             return bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'], target2)
417         
418         my_shares = set()
419         times = {}
420         
421         def got_response(data):
422             try:
423                 # match up with transactions
424                 header = bitcoin.getwork.decode_data(data)
425                 transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
426                 if transactions is None:
427                     print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
428                     return False
429                 block = dict(header=header, txs=transactions)
430                 hash_ = bitcoin.data.block_header_type.hash256(block['header'])
431                 if hash_ <= block['header']['target'] or p2pool_init.DEBUG:
432                     if factory.conn.value is not None:
433                         factory.conn.value.send_block(block=block)
434                     else:
435                         print 'No bitcoind connection! Erp!'
436                     if hash_ <= block['header']['target']:
437                         print
438                         print 'GOT BLOCK! Passing to bitcoind! bitcoin: %x' % (hash_,)
439                         print
440                 target = p2pool.coinbase_type.unpack(transactions[0]['tx_ins'][0]['script'])['share_data']['target']
441                 if hash_ > target:
442                     print 'Worker submitted share with hash > target:\nhash  : %x\ntarget: %x' % (hash_, target)
443                     return False
444                 share = p2pool.Share.from_block(block)
445                 my_shares.add(share.hash)
446                 print 'GOT SHARE! %s prev %s age %.2fs' % (p2pool.format_hash(share.hash), p2pool.format_hash(share.previous_hash), time.time() - times[share.nonce]) + (' DEAD ON ARRIVAL' if share.previous_hash != current_work.value['best_share_hash'] else '')
447                 good = share.previous_hash == current_work.value['best_share_hash']
448                 # maybe revert back to tracker being non-blocking so 'good' can be more accurate?
449                 p2p_shares([share])
450                 # eg. good = share.hash == current_work.value['best_share_hash'] here
451                 return good
452             except:
453                 log.err(None, 'Error processing data received from worker:')
454                 return False
455         
456         web_root = worker_interface.WorkerInterface(current_work, compute, got_response, args.net)
457         
458         def get_rate():
459             if current_work.value['best_share_hash'] is not None:
460                 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
461                 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net, min(height, 720))
462                 return json.dumps(att_s)
463             return json.dumps(None)
464         
465         def get_users():
466             height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
467             weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 2**256)
468             res = {}
469             for script in sorted(weights, key=lambda s: weights[s]):
470                 res[bitcoin.data.script2_to_human(script, args.net)] = weights[script]/total_weight
471             return json.dumps(res)
472         
473         class WebInterface(resource.Resource):
474             def __init__(self, func, mime_type):
475                 self.func, self.mime_type = func, mime_type
476             
477             def render_GET(self, request):
478                 request.setHeader('Content-Type', self.mime_type)
479                 return self.func()
480         
481         web_root.putChild('rate', WebInterface(get_rate, 'application/json'))
482         web_root.putChild('users', WebInterface(get_users, 'application/json'))
483         if args.charts:
484             web_root.putChild('chain_img', WebInterface(lambda: draw.get(tracker, current_work.value['best_share_hash']), 'image/png'))
485         
486         reactor.listenTCP(args.worker_port, server.Site(web_root))
487         
488         print '    ...success!'
489         print
490         
491         # done!
492         
493         tx_pool = expiring_dict.ExpiringDict(600, get_touches=False) # hash -> tx
494         get_raw_transaction = deferral.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
495         
496         class Tx(object):
497             def __init__(self, tx, seen_at_block):
498                 self.hash = bitcoin.data.tx_type.hash256(tx)
499                 self.tx = tx
500                 self.seen_at_block = seen_at_block
501                 self.mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
502                 #print
503                 #print '%x %r' % (seen_at_block, tx)
504                 #for mention in self.mentions:
505                 #    print '%x' % mention
506                 #print
507                 self.parents_all_in_blocks = False
508                 self.value_in = 0
509                 #print self.tx
510                 self.value_out = sum(txout['value'] for txout in self.tx['tx_outs'])
511                 self._find_parents_in_blocks()
512             
513             @defer.inlineCallbacks
514             def _find_parents_in_blocks(self):
515                 for tx_in in self.tx['tx_ins']:
516                     try:
517                         raw_transaction = yield get_raw_transaction(tx_in['previous_output']['hash'])
518                     except Exception:
519                         return
520                     self.value_in += raw_transaction['tx']['txouts'][tx_in['previous_output']['index']]['value']
521                     #print raw_transaction
522                     if not raw_transaction['parent_blocks']:
523                         return
524                 self.parents_all_in_blocks = True
525             
526             def is_good(self):
527                 if not self.parents_all_in_blocks:
528                     return False
529                 x = self.is_good2()
530                 #print 'is_good:', x
531                 return x
532         
533         @defer.inlineCallbacks
534         def new_tx(tx_hash):
535             try:
536                 assert isinstance(tx_hash, (int, long))
537                 #print 'REQUESTING', tx_hash
538                 tx = yield (yield factory.getProtocol()).get_tx(tx_hash)
539                 #print 'GOT', tx
540                 tx_pool[bitcoin.data.tx_type.hash256(tx)] = Tx(tx, current_work.value['previous_block'])
541             except:
542                 log.err(None, 'Error handling tx:')
543         # disable for now, for testing impact on stales
544         #factory.new_tx.watch(new_tx)
545         
546         def new_block(block_hash):
547             work_updated.happened()
548         factory.new_block.watch(new_block)
549         
550         print 'Started successfully!'
551         print
552         
553         ht.updated.watch(set_real_work2)
554         
555         @defer.inlineCallbacks
556         def work1_thread():
557             while True:
558                 flag = work_updated.get_deferred()
559                 try:
560                     yield set_real_work1()
561                 except:
562                     log.err()
563                 yield defer.DeferredList([flag, deferral.sleep(random.uniform(1, 10))], fireOnOneCallback=True)
564         
565         @defer.inlineCallbacks
566         def work2_thread():
567             while True:
568                 try:
569                     set_real_work2()
570                 except:
571                     log.err()
572                 yield deferral.sleep(random.expovariate(1/20))
573         
574         work1_thread()
575         work2_thread()
576         
577         
578         if hasattr(signal, 'SIGALRM'):
579             def watchdog_handler(signum, frame):
580                 print 'Watchdog timer went off at:'
581                 traceback.print_exc()
582             
583             signal.signal(signal.SIGALRM, watchdog_handler)
584             task.LoopingCall(signal.alarm, 30).start(1)
585         
586         
587         def read_stale_frac(share):
588             if len(share.nonce) != 20:
589                 return None
590             a, b = struct.unpack("<HH", share.nonce[-4:])
591             if a != b:
592                 return None
593             return a/65535
594         
595         while True:
596             yield deferral.sleep(3)
597             try:
598                 if current_work.value['best_share_hash'] is not None:
599                     height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
600                     if height > 2:
601                         att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net, min(height - 1, 120))
602                         weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 120), 2**100)
603                         shares, stale_shares = get_share_counts()
604                         print 'Pool: %sH/s in %i shares (%i/%i verified) Recent: %.02f%% >%sH/s Shares: %i (%i stale) Peers: %i' % (
605                             math.format(att_s),
606                             height,
607                             len(tracker.verified.shares),
608                             len(tracker.shares),
609                             weights.get(my_script, 0)/total_weight*100,
610                             math.format(weights.get(my_script, 0)/total_weight*att_s),
611                             shares,
612                             stale_shares,
613                             len(p2p_node.peers),
614                         ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool_init.DEBUG else '')
615                         fracs = [read_stale_frac(share) for share in itertools.islice(tracker.get_chain_known(current_work.value['best_share_hash']), 120) if read_stale_frac(share) is not None]
616                         if fracs:
617                             med = math.median(fracs)
618                             print 'Median stale proportion:', med
619                             if shares:
620                                 print '    Own:', stale_shares/shares
621                                 if med < .99:
622                                     print '    Own efficiency: %.02f%%' % (100*(1 - stale_shares/shares)/(1 - med),)
623                             
624                             
625             except:
626                 log.err()
627     except:
628         log.err(None, 'Fatal error:')
629     finally:
630         reactor.stop()
631
632 def run():
633     parser = argparse.ArgumentParser(description='p2pool (version %s)' % (p2pool_init.__version__,), fromfile_prefix_chars='@')
634     parser.convert_arg_line_to_args = lambda arg_line: (arg for arg in arg_line.split() if arg.strip())
635     parser.add_argument('--version', action='version', version=p2pool_init.__version__)
636     parser.add_argument('--net',
637         help='use specified network (choices: bitcoin (default), namecoin, ixcoin)',
638         action='store', choices=set(['bitcoin', 'namecoin', 'ixcoin']), default='bitcoin', dest='net_name')
639     parser.add_argument('--testnet',
640         help='use the testnet',
641         action='store_const', const=True, default=False, dest='testnet')
642     parser.add_argument('--debug',
643         help='debugging mode',
644         action='store_const', const=True, default=False, dest='debug')
645     parser.add_argument('-a', '--address',
646         help='generate to this address (defaults to requesting one from bitcoind)',
647         type=str, action='store', default=None, dest='address')
648     parser.add_argument('--charts',
649         help='generate charts on the web interface (requires PIL and pygame)',
650         action='store_const', const=True, default=False, dest='charts')
651     
652     p2pool_group = parser.add_argument_group('p2pool interface')
653     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
654         help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
655         type=int, action='store', default=None, dest='p2pool_port')
656     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
657         help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
658         type=str, action='append', default=[], dest='p2pool_nodes')
659     parser.add_argument('-l', '--low-bandwidth',
660         help='trade lower bandwidth usage for higher latency (reduced efficiency)',
661         action='store_true', default=False, dest='low_bandwidth')
662     parser.add_argument('--disable-upnp',
663         help='''don't attempt to forward port 9333 (19333 for testnet) from the WAN to this computer using UPnP''',
664         action='store_false', default=True, dest='upnp')
665     
666     worker_group = parser.add_argument_group('worker interface')
667     worker_group.add_argument('-w', '--worker-port', metavar='PORT',
668         help='listen on PORT for RPC connections from miners asking for work and providing responses (default: 9332)',
669         type=int, action='store', default=9332, dest='worker_port')
670     
671     bitcoind_group = parser.add_argument_group('bitcoind interface')
672     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
673         help='connect to a bitcoind at this address (default: 127.0.0.1)',
674         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
675     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
676         help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332, 8338 for ixcoin)',
677         type=int, action='store', default=None, dest='bitcoind_rpc_port')
678     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
679         help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 normally. 18333 for testnet)',
680         type=int, action='store', default=None, dest='bitcoind_p2p_port')
681     
682     bitcoind_group.add_argument(metavar='BITCOIND_RPC_USERNAME',
683         help='bitcoind RPC interface username',
684         type=str, action='store', dest='bitcoind_rpc_username')
685     bitcoind_group.add_argument(metavar='BITCOIND_RPC_PASSWORD',
686         help='bitcoind RPC interface password',
687         type=str, action='store', dest='bitcoind_rpc_password')
688     
689     args = parser.parse_args()
690     
691     if args.debug:
692         p2pool_init.DEBUG = True
693         class ReopeningFile(object):
694             def __init__(self, *open_args, **open_kwargs):
695                 self.open_args, self.open_kwargs = open_args, open_kwargs
696                 self.inner_file = open(*self.open_args, **self.open_kwargs)
697             def reopen(self):
698                 self.inner_file.close()
699                 self.inner_file = open(*self.open_args, **self.open_kwargs)
700             def write(self, data):
701                 self.inner_file.write(data)
702             def flush(self):
703                 self.inner_file.flush()
704         class TeePipe(object):
705             def __init__(self, outputs):
706                 self.outputs = outputs
707             def write(self, data):
708                 for output in self.outputs:
709                     output.write(data)
710             def flush(self):
711                 for output in self.outputs:
712                     output.flush()
713         class TimestampingPipe(object):
714             def __init__(self, inner_file):
715                 self.inner_file = inner_file
716                 self.buf = ''
717                 self.softspace = 0
718             def write(self, data):
719                 buf = self.buf + data
720                 lines = buf.split('\n')
721                 for line in lines[:-1]:
722                     self.inner_file.write('%s %s\n' % (datetime.datetime.now().strftime("%H:%M:%S.%f"), line))
723                     self.inner_file.flush()
724                 self.buf = lines[-1]
725             def flush(self):
726                 pass
727         logfile = ReopeningFile(os.path.join(os.path.dirname(sys.argv[0]), 'debug.log'), 'w')
728         sys.stdout = sys.stderr = log.DefaultObserver.stderr = TimestampingPipe(TeePipe([sys.stderr, logfile]))
729         if hasattr(signal, "SIGUSR1"):
730             def sigusr1(signum, frame):
731                 print '''Caught SIGUSR1, closing 'debug.log'...'''
732                 logfile.reopen()
733                 print '''...and reopened 'debug.log' after catching SIGUSR1.'''
734             signal.signal(signal.SIGUSR1, sigusr1)
735     
736     args.net = {
737         ('bitcoin', False): p2pool.Mainnet,
738         ('bitcoin', True): p2pool.Testnet,
739         ('namecoin', False): p2pool.NamecoinMainnet,
740         ('namecoin', True): p2pool.NamecoinTestnet,
741         ('ixcoin', False): p2pool.IxcoinMainnet,
742         ('ixcoin', True): p2pool.IxcoinTestnet,
743     }[args.net_name, args.testnet]
744     
745     if args.bitcoind_rpc_port is None:
746         args.bitcoind_rpc_port = args.net.BITCOIN_RPC_PORT
747     
748     if args.bitcoind_p2p_port is None:
749         args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
750     
751     if args.p2pool_port is None:
752         args.p2pool_port = args.net.P2P_PORT
753     
754     if args.address is not None:
755         try:
756             args.pubkey_hash = bitcoin.data.address_to_pubkey_hash(args.address, args.net)
757         except Exception, e:
758             raise ValueError('error parsing address: ' + repr(e))
759     else:
760         args.pubkey_hash = None
761     
762     reactor.callWhenRunning(main, args)
763     reactor.run()