don't include subsidy in estimated payout
[p2pool.git] / p2pool / main.py
1 #!/usr/bin/python
2
3 from __future__ import division
4
5 import argparse
6 import datetime
7 import itertools
8 import os
9 import random
10 import sqlite3
11 import struct
12 import sys
13 import time
14 import json
15 import signal
16 import traceback
17
18 from twisted.internet import defer, reactor, task, threads
19 from twisted.web import server, resource
20 from twisted.python import log
21 from nattraverso import portmapper, ipdiscover
22
23 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
24 from util import db, expiring_dict, jsonrpc, variable, deferral, math
25 from . import p2p, worker_interface, skiplists
26 import p2pool.data as p2pool
27 import p2pool as p2pool_init
28
29 @deferral.retry('Error getting work from bitcoind:', 3)
30 @defer.inlineCallbacks
31 def getwork(bitcoind, ht):
32     # a block could arrive in between these two queries
33     work = bitcoin.getwork.BlockAttempt.from_getwork((yield bitcoind.rpc_getwork()))
34     try:
35         height = ht.getHeight(work.previous_block)
36     except ValueError:
37         height = 1000 # XXX
38     defer.returnValue((work, height))
39
40 @deferral.retry('Error getting payout script from bitcoind:', 1)
41 @defer.inlineCallbacks
42 def get_payout_script(factory):
43     res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
44     if res['reply'] == 'success':
45         defer.returnValue(res['script'])
46     elif res['reply'] == 'denied':
47         defer.returnValue(None)
48     else:
49         raise ValueError('Unexpected reply: %r' % (res,))
50
51 @deferral.retry('Error creating payout script:', 10)
52 @defer.inlineCallbacks
53 def get_payout_script2(bitcoind, net):
54     defer.returnValue(bitcoin.data.pubkey_hash_to_script2(bitcoin.data.address_to_pubkey_hash((yield bitcoind.rpc_getaccountaddress('p2pool')), net)))
55
56 @defer.inlineCallbacks
57 def main(args):
58     try:
59         if args.charts:
60             from . import draw
61         
62         print 'p2pool (version %s)' % (p2pool_init.__version__,)
63         print
64         
65         # connect to bitcoind over JSON-RPC and do initial getwork
66         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
67         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
68         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
69         good = yield deferral.retry('Error while checking bitcoind identity:', 1)(args.net.BITCOIN_RPC_CHECK)(bitcoind)
70         if not good:
71             print "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
72             return
73         temp_work = yield deferral.retry('Error while testing getwork:', 1)(defer.inlineCallbacks(lambda: defer.returnValue(bitcoin.getwork.BlockAttempt.from_getwork((yield bitcoind.rpc_getwork())))))()
74         print '    ...success!'
75         print '    Current block hash: %x' % (temp_work.previous_block,)
76         print
77         
78         # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
79         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
80         factory = bitcoin.p2p.ClientFactory(args.net)
81         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
82         my_script = yield get_payout_script(factory)
83         if args.pubkey_hash is None:
84             if my_script is None:
85                 print '    IP transaction denied ... falling back to sending to address.'
86                 my_script = yield get_payout_script2(bitcoind, args.net)
87         else:
88             my_script = bitcoin.data.pubkey_hash_to_script2(args.pubkey_hash)
89         print '    ...success!'
90         print '    Payout script:', my_script.encode('hex')
91         print
92         
93         print 'Loading cached block headers...'
94         ht = bitcoin.p2p.HeightTracker(factory, args.net.NAME + '_headers.dat')
95         print '   ...done loading %i cached block headers.' % (len(ht.tracker.shares),)
96         print
97         
98         tracker = p2pool.OkayTracker(args.net)
99         ss = p2pool.ShareStore(os.path.join(os.path.dirname(sys.argv[0]), args.net.NAME + '_shares.'), args.net)
100         known_verified = set()
101         print "Loading shares..."
102         for i, (mode, contents) in enumerate(ss.get_shares()):
103             if mode == 'share':
104                 if contents.hash in tracker.shares:
105                     continue
106                 contents.shared = True
107                 contents.stored = True
108                 tracker.add(contents)
109                 if len(tracker.shares) % 1000 == 0 and tracker.shares:
110                     print "    %i" % (len(tracker.shares),)
111             elif mode == 'verified_hash':
112                 known_verified.add(contents)
113             else:
114                 raise AssertionError()
115         print "    ...inserting %i verified shares..." % (len(known_verified),)
116         for h in known_verified:
117             if h not in tracker.shares:
118                 ss.forget_verified_share(h)
119                 continue
120             tracker.verified.add(tracker.shares[h])
121         print "    ...done loading %i shares!" % (len(tracker.shares),)
122         print
123         tracker.added.watch(lambda share: ss.add_share(share))
124         tracker.verified.added.watch(lambda share: ss.add_verified_hash(share.hash))
125         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
126         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
127         
128         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
129         
130         # information affecting work that should trigger a long-polling update
131         current_work = variable.Variable(None)
132         # information affecting work that should not trigger a long-polling update
133         current_work2 = variable.Variable(None)
134         
135         work_updated = variable.Event()
136         
137         requested = expiring_dict.ExpiringDict(300)
138         
139         @defer.inlineCallbacks
140         def set_real_work1():
141             work, height = yield getwork(bitcoind, ht)
142             changed = work.previous_block != current_work.value['previous_block'] if current_work.value is not None else True
143             current_work.set(dict(
144                 version=work.version,
145                 previous_block=work.previous_block,
146                 target=work.target,
147                 height=height,
148                 best_share_hash=current_work.value['best_share_hash'] if current_work.value is not None else None,
149             ))
150             current_work2.set(dict(
151                 clock_offset=time.time() - work.timestamp,
152                 last_update=time.time(),
153             ))
154             if changed:
155                 set_real_work2()
156         
157         def set_real_work2():
158             best, desired = tracker.think(ht, current_work.value['previous_block'], time.time() - current_work2.value['clock_offset'])
159             
160             t = dict(current_work.value)
161             t['best_share_hash'] = best
162             current_work.set(t)
163             
164             t = time.time()
165             for peer2, share_hash in desired:
166                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
167                     continue
168                 last_request_time, count = requested.get(share_hash, (None, 0))
169                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
170                     continue
171                 potential_peers = set()
172                 for head in tracker.tails[share_hash]:
173                     potential_peers.update(peer_heads.get(head, set()))
174                 potential_peers = [peer for peer in potential_peers if peer.connected2]
175                 if count == 0 and peer2 is not None and peer2.connected2:
176                     peer = peer2
177                 else:
178                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
179                     if peer is None:
180                         continue
181                 
182                 print 'Requesting parent share %s from %s' % (p2pool.format_hash(share_hash), '%s:%i' % peer.addr)
183                 peer.send_getshares(
184                     hashes=[share_hash],
185                     parents=2000,
186                     stops=list(set(tracker.heads) | set(
187                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
188                     ))[:100],
189                 )
190                 requested[share_hash] = t, count + 1
191         
192         print 'Initializing work...'
193         yield set_real_work1()
194         set_real_work2()
195         print '    ...success!'
196         print
197         
198         start_time = time.time() - current_work2.value['clock_offset']
199         
200         # setup p2p logic and join p2pool network
201         
202         def share_share(share, ignore_peer=None):
203             for peer in p2p_node.peers.itervalues():
204                 if peer is ignore_peer:
205                     continue
206                 #if p2pool_init.DEBUG:
207                 #    print "Sending share %s to %r" % (p2pool.format_hash(share.hash), peer.addr)
208                 peer.send_shares([share])
209             share.flag_shared()
210         
211         def p2p_shares(shares, peer=None):
212             if len(shares) > 5:
213                 print 'Processing %i shares...' % (len(shares),)
214             
215             some_new = False
216             for share in shares:
217                 if share.hash in tracker.shares:
218                     #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool.format_hash(share.hash),)
219                     continue
220                 some_new = True
221                 
222                 #print 'Received share %s from %r' % (p2pool.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
223                 
224                 tracker.add(share)
225                 #for peer2, share_hash in desired:
226                 #    print 'Requesting parent share %x' % (share_hash,)
227                 #    peer2.send_getshares(hashes=[share_hash], parents=2000)
228                 
229                 if share.bitcoin_hash <= share.header['target']:
230                     print
231                     print 'GOT BLOCK! Passing to bitcoind! %s bitcoin: %x' % (p2pool.format_hash(share.hash), share.bitcoin_hash,)
232                     print
233                     if factory.conn.value is not None:
234                         factory.conn.value.send_block(block=share.as_block(tracker, args.net))
235                     else:
236                         print 'No bitcoind connection! Erp!'
237             
238             if shares and peer is not None:
239                 peer_heads.setdefault(shares[0].hash, set()).add(peer)
240             
241             if some_new:
242                 set_real_work2()
243             
244             if len(shares) > 5:
245                 print '... done processing %i shares. Have: %i/~%i' % (len(shares), len(tracker.shares), 2*args.net.CHAIN_LENGTH)
246         
247         def p2p_share_hashes(share_hashes, peer):
248             t = time.time()
249             get_hashes = []
250             for share_hash in share_hashes:
251                 if share_hash in tracker.shares:
252                     continue
253                 last_request_time, count = requested.get(share_hash, (None, 0))
254                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
255                     continue
256                 print 'Got share hash, requesting! Hash: %s' % (p2pool.format_hash(share_hash),)
257                 get_hashes.append(share_hash)
258                 requested[share_hash] = t, count + 1
259             
260             if share_hashes and peer is not None:
261                 peer_heads.setdefault(share_hashes[0], set()).add(peer)
262             if get_hashes:
263                 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
264         
265         def p2p_get_shares(share_hashes, parents, stops, peer):
266             parents = min(parents, 1000//len(share_hashes))
267             stops = set(stops)
268             shares = []
269             for share_hash in share_hashes:
270                 for share in itertools.islice(tracker.get_chain_known(share_hash), parents + 1):
271                     if share.hash in stops:
272                         break
273                     shares.append(share)
274             print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
275             peer.send_shares(shares, full=True)
276         
277         print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
278         
279         def parse(x):
280             if ':' in x:
281                 ip, port = x.split(':')
282                 return ip, int(port)
283             else:
284                 return x, args.net.P2P_PORT
285         
286         nodes = set([
287             ('72.14.191.28', args.net.P2P_PORT),
288             ('62.204.197.159', args.net.P2P_PORT),
289             ('142.58.248.28', args.net.P2P_PORT),
290             ('94.23.34.145', args.net.P2P_PORT),
291         ])
292         for host in [
293             'p2pool.forre.st',
294             'dabuttonfactory.com',
295         ]:
296             try:
297                 nodes.add(((yield reactor.resolve(host)), args.net.P2P_PORT))
298             except:
299                 log.err(None, 'Error resolving bootstrap node IP:')
300         
301         p2p_node = p2p.Node(
302             current_work=current_work,
303             port=args.p2pool_port,
304             net=args.net,
305             addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.NAME),
306             mode=0 if args.low_bandwidth else 1,
307             preferred_addrs=set(map(parse, args.p2pool_nodes)) | nodes,
308         )
309         p2p_node.handle_shares = p2p_shares
310         p2p_node.handle_share_hashes = p2p_share_hashes
311         p2p_node.handle_get_shares = p2p_get_shares
312         
313         p2p_node.start()
314         
315         # send share when the chain changes to their chain
316         def work_changed(new_work):
317             #print 'Work changed:', new_work
318             for share in tracker.get_chain_known(new_work['best_share_hash']):
319                 if share.shared:
320                     break
321                 share_share(share, share.peer)
322         current_work.changed.watch(work_changed)
323         
324         print '    ...success!'
325         print
326         
327         @defer.inlineCallbacks
328         def upnp_thread():
329             while True:
330                 try:
331                     is_lan, lan_ip = yield ipdiscover.get_local_ip()
332                     if is_lan:
333                         pm = yield portmapper.get_port_mapper()
334                         yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP') # XXX try to forward external correct port?
335                 except:
336                     if p2pool_init.DEBUG:
337                         log.err(None, "UPnP error:")
338                 yield deferral.sleep(random.expovariate(1/120))
339         
340         if args.upnp:
341             upnp_thread()
342         
343         # start listening for workers with a JSON-RPC server
344         
345         print 'Listening for workers on port %i...' % (args.worker_port,)
346         
347         # setup worker logic
348         
349         merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
350         run_identifier = struct.pack('<Q', random.randrange(2**64))
351         
352         share_counter = skiplists.CountsSkipList(tracker, run_identifier)
353         removed_unstales = set()
354         def get_share_counts():
355             height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
356             matching_in_chain = share_counter(current_work.value['best_share_hash'], height) | removed_unstales
357             shares_in_chain = my_shares & matching_in_chain
358             stale_shares = my_shares - matching_in_chain
359             return len(shares_in_chain) + len(stale_shares), len(stale_shares)
360         @tracker.verified.removed.watch
361         def _(share):
362             if share.hash in my_shares and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
363                 removed_unstales.add(share.hash)
364         
365         def compute(state, payout_script):
366             if payout_script is None:
367                 payout_script = my_script
368             if state['best_share_hash'] is None and args.net.PERSIST:
369                 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
370             if len(p2p_node.peers) == 0 and args.net.PERSIST:
371                 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
372             if time.time() > current_work2.value['last_update'] + 60:
373                 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
374             pre_extra_txs = [tx for tx in tx_pool.itervalues() if tx.is_good()]
375             pre_extra_txs = pre_extra_txs[:2**16 - 1] # merkle_branch limit
376             extra_txs = []
377             size = 0
378             for tx in pre_extra_txs:
379                 this_size = len(bitcoin.data.tx_type.pack(tx.tx))
380                 if size + this_size > 500000:
381                     break
382                 extra_txs.append(tx)
383                 size += this_size
384             # XXX check sigops!
385             # XXX assuming generate_tx is smallish here..
386             def get_stale_frac():
387                 shares, stale_shares = get_share_counts()
388                 if shares == 0:
389                     return ""
390                 frac = stale_shares/shares
391                 return 2*struct.pack('<H', int(65535*frac + .5))
392             subsidy = args.net.BITCOIN_SUBSIDY_FUNC(state['height']) + sum(tx.value_in - tx.value_out for tx in extra_txs)
393             generate_tx = p2pool.generate_transaction(
394                 tracker=tracker,
395                 previous_share_hash=state['best_share_hash'],
396                 new_script=payout_script,
397                 subsidy=subsidy,
398                 nonce=run_identifier + struct.pack('<Q', random.randrange(2**64)) + get_stale_frac(),
399                 block_target=state['target'],
400                 net=args.net,
401             )
402             print 'New work for worker! Difficulty: %.06f Payout if block: %.6f %s' % (0xffff*2**208/p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'], (generate_tx['tx_outs'][-1]['value']-subsidy//200)*1e-8, args.net.BITCOIN_SYMBOL)
403             #print 'Target: %x' % (p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'],)
404             #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
405             transactions = [generate_tx] + [tx.tx for tx in extra_txs]
406             merkle_root = bitcoin.data.merkle_hash(transactions)
407             merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
408             
409             timestamp = int(time.time() - current_work2.value['clock_offset'])
410             if state['best_share_hash'] is not None:
411                 timestamp2 = math.median((s.timestamp for s in itertools.islice(tracker.get_chain_to_root(state['best_share_hash']), 11)), use_float=False) + 1
412                 if timestamp2 > timestamp:
413                     print 'Toff', timestamp2 - timestamp
414                     timestamp = timestamp2
415             target2 = p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
416             times[p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['nonce']] = time.time()
417             #print 'SENT', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
418             return bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'], target2)
419         
420         my_shares = set()
421         times = {}
422         
423         def got_response(data, user):
424             try:
425                 # match up with transactions
426                 header = bitcoin.getwork.decode_data(data)
427                 transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
428                 if transactions is None:
429                     print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
430                     return False
431                 block = dict(header=header, txs=transactions)
432                 hash_ = bitcoin.data.block_header_type.hash256(block['header'])
433                 if hash_ <= block['header']['target'] or p2pool_init.DEBUG:
434                     if factory.conn.value is not None:
435                         factory.conn.value.send_block(block=block)
436                     else:
437                         print 'No bitcoind connection! Erp!'
438                     if hash_ <= block['header']['target']:
439                         print
440                         print 'GOT BLOCK! Passing to bitcoind! bitcoin: %x' % (hash_,)
441                         print
442                 target = p2pool.coinbase_type.unpack(transactions[0]['tx_ins'][0]['script'])['share_data']['target']
443                 if hash_ > target:
444                     print 'Worker submitted share with hash > target:\nhash  : %x\ntarget: %x' % (hash_, target)
445                     return False
446                 share = p2pool.Share.from_block(block)
447                 my_shares.add(share.hash)
448                 print 'GOT SHARE! %s %s prev %s age %.2fs' % (user, p2pool.format_hash(share.hash), p2pool.format_hash(share.previous_hash), time.time() - times[share.nonce]) + (' DEAD ON ARRIVAL' if share.previous_hash != current_work.value['best_share_hash'] else '')
449                 good = share.previous_hash == current_work.value['best_share_hash']
450                 # maybe revert back to tracker being non-blocking so 'good' can be more accurate?
451                 p2p_shares([share])
452                 # eg. good = share.hash == current_work.value['best_share_hash'] here
453                 return good
454             except:
455                 log.err(None, 'Error processing data received from worker:')
456                 return False
457         
458         web_root = worker_interface.WorkerInterface(current_work, compute, got_response, args.net)
459         
460         def get_rate():
461             if current_work.value['best_share_hash'] is not None:
462                 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
463                 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net, min(height, 720))
464                 return json.dumps(att_s)
465             return json.dumps(None)
466         
467         def get_users():
468             height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
469             weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 2**256)
470             res = {}
471             for script in sorted(weights, key=lambda s: weights[s]):
472                 res[bitcoin.data.script2_to_human(script, args.net)] = weights[script]/total_weight
473             return json.dumps(res)
474         
475         class WebInterface(resource.Resource):
476             def __init__(self, func, mime_type):
477                 self.func, self.mime_type = func, mime_type
478             
479             def render_GET(self, request):
480                 request.setHeader('Content-Type', self.mime_type)
481                 return self.func()
482         
483         web_root.putChild('rate', WebInterface(get_rate, 'application/json'))
484         web_root.putChild('users', WebInterface(get_users, 'application/json'))
485         if args.charts:
486             web_root.putChild('chain_img', WebInterface(lambda: draw.get(tracker, current_work.value['best_share_hash']), 'image/png'))
487         
488         reactor.listenTCP(args.worker_port, server.Site(web_root))
489         
490         print '    ...success!'
491         print
492         
493         # done!
494         
495         tx_pool = expiring_dict.ExpiringDict(600, get_touches=False) # hash -> tx
496         get_raw_transaction = deferral.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
497         
498         class Tx(object):
499             def __init__(self, tx, seen_at_block):
500                 self.hash = bitcoin.data.tx_type.hash256(tx)
501                 self.tx = tx
502                 self.seen_at_block = seen_at_block
503                 self.mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
504                 #print
505                 #print '%x %r' % (seen_at_block, tx)
506                 #for mention in self.mentions:
507                 #    print '%x' % mention
508                 #print
509                 self.parents_all_in_blocks = False
510                 self.value_in = 0
511                 #print self.tx
512                 self.value_out = sum(txout['value'] for txout in self.tx['tx_outs'])
513                 self._find_parents_in_blocks()
514             
515             @defer.inlineCallbacks
516             def _find_parents_in_blocks(self):
517                 for tx_in in self.tx['tx_ins']:
518                     try:
519                         raw_transaction = yield get_raw_transaction(tx_in['previous_output']['hash'])
520                     except Exception:
521                         return
522                     self.value_in += raw_transaction['tx']['txouts'][tx_in['previous_output']['index']]['value']
523                     #print raw_transaction
524                     if not raw_transaction['parent_blocks']:
525                         return
526                 self.parents_all_in_blocks = True
527             
528             def is_good(self):
529                 if not self.parents_all_in_blocks:
530                     return False
531                 x = self.is_good2()
532                 #print 'is_good:', x
533                 return x
534         
535         @defer.inlineCallbacks
536         def new_tx(tx_hash):
537             try:
538                 assert isinstance(tx_hash, (int, long))
539                 #print 'REQUESTING', tx_hash
540                 tx = yield (yield factory.getProtocol()).get_tx(tx_hash)
541                 #print 'GOT', tx
542                 tx_pool[bitcoin.data.tx_type.hash256(tx)] = Tx(tx, current_work.value['previous_block'])
543             except:
544                 log.err(None, 'Error handling tx:')
545         # disable for now, for testing impact on stales
546         #factory.new_tx.watch(new_tx)
547         
548         def new_block(block_hash):
549             work_updated.happened()
550         factory.new_block.watch(new_block)
551         
552         print 'Started successfully!'
553         print
554         
555         ht.updated.watch(set_real_work2)
556         
557         @defer.inlineCallbacks
558         def work1_thread():
559             while True:
560                 flag = work_updated.get_deferred()
561                 try:
562                     yield set_real_work1()
563                 except:
564                     log.err()
565                 yield defer.DeferredList([flag, deferral.sleep(random.uniform(1, 10))], fireOnOneCallback=True)
566         
567         @defer.inlineCallbacks
568         def work2_thread():
569             while True:
570                 try:
571                     set_real_work2()
572                 except:
573                     log.err()
574                 yield deferral.sleep(random.expovariate(1/20))
575         
576         work1_thread()
577         work2_thread()
578         
579         
580         if hasattr(signal, 'SIGALRM'):
581             def watchdog_handler(signum, frame):
582                 print 'Watchdog timer went off at:'
583                 traceback.print_stack()
584             
585             signal.signal(signal.SIGALRM, watchdog_handler)
586             task.LoopingCall(signal.alarm, 30).start(1)
587         
588         
589         def read_stale_frac(share):
590             if len(share.nonce) != 20:
591                 return None
592             a, b = struct.unpack("<HH", share.nonce[-4:])
593             if a != b:
594                 return None
595             return a/65535
596         
597         while True:
598             yield deferral.sleep(3)
599             try:
600                 if time.time() > current_work2.value['last_update'] + 60:
601                     print '''LOST CONTACT WITH BITCOIND for 60 seconds, check that it isn't frozen or dead'''
602                 if current_work.value['best_share_hash'] is not None:
603                     height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
604                     if height > 2:
605                         att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net, min(height - 1, 120))
606                         weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 120), 2**100)
607                         shares, stale_shares = get_share_counts()
608                         print 'Pool: %sH/s in %i shares (%i/%i verified) Recent: %.02f%% >%sH/s Shares: %i (%i stale) Peers: %i' % (
609                             math.format(att_s),
610                             height,
611                             len(tracker.verified.shares),
612                             len(tracker.shares),
613                             weights.get(my_script, 0)/total_weight*100,
614                             math.format(weights.get(my_script, 0)/total_weight*att_s),
615                             shares,
616                             stale_shares,
617                             len(p2p_node.peers),
618                         ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool_init.DEBUG else '')
619                         fracs = [read_stale_frac(share) for share in itertools.islice(tracker.get_chain_known(current_work.value['best_share_hash']), 120) if read_stale_frac(share) is not None]
620                         if fracs:
621                             med = math.median(fracs)
622                             print 'Median stale proportion:', med
623                             if shares:
624                                 print '    Own:', stale_shares/shares
625                                 if med < .99:
626                                     print '    Own efficiency: %.02f%%' % (100*(1 - stale_shares/shares)/(1 - med),)
627                             
628                             
629             except:
630                 log.err()
631     except:
632         log.err(None, 'Fatal error:')
633     finally:
634         reactor.stop()
635
636 def run():
637     class FixedArgumentParser(argparse.ArgumentParser):
638         def _read_args_from_files(self, arg_strings):
639             # expand arguments referencing files
640             new_arg_strings = []
641             for arg_string in arg_strings:
642                 
643                 # for regular arguments, just add them back into the list
644                 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
645                     new_arg_strings.append(arg_string)
646                 
647                 # replace arguments referencing files with the file content
648                 else:
649                     try:
650                         args_file = open(arg_string[1:])
651                         try:
652                             arg_strings = []
653                             for arg_line in args_file.read().splitlines():
654                                 for arg in self.convert_arg_line_to_args(arg_line):
655                                     arg_strings.append(arg)
656                             arg_strings = self._read_args_from_files(arg_strings)
657                             new_arg_strings.extend(arg_strings)
658                         finally:
659                             args_file.close()
660                     except IOError:
661                         err = sys.exc_info()[1]
662                         self.error(str(err))
663             
664             # return the modified argument list
665             return new_arg_strings
666     
667     parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool_init.__version__,), fromfile_prefix_chars='@')
668     parser.convert_arg_line_to_args = lambda arg_line: (arg for arg in arg_line.split() if arg.strip())
669     parser.add_argument('--version', action='version', version=p2pool_init.__version__)
670     parser.add_argument('--net',
671         help='use specified network (default: bitcoin)',
672         action='store', choices=sorted(x for x in p2pool.nets if 'testnet' not in x), default='bitcoin', dest='net_name')
673     parser.add_argument('--testnet',
674         help='''use the network's testnet''',
675         action='store_const', const=True, default=False, dest='testnet')
676     parser.add_argument('--debug',
677         help='debugging mode',
678         action='store_const', const=True, default=False, dest='debug')
679     parser.add_argument('-a', '--address',
680         help='generate to this address (defaults to requesting one from bitcoind)',
681         type=str, action='store', default=None, dest='address')
682     parser.add_argument('--charts',
683         help='generate charts on the web interface (requires PIL and pygame)',
684         action='store_const', const=True, default=False, dest='charts')
685     parser.add_argument('--logfile',
686         help='''log to specific file (defaults to <network_name>.log in run_p2pool.py's directory)''',
687         type=str, action='store', default=None, dest='logfile')
688     
689     p2pool_group = parser.add_argument_group('p2pool interface')
690     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
691         help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
692         type=int, action='store', default=None, dest='p2pool_port')
693     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
694         help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
695         type=str, action='append', default=[], dest='p2pool_nodes')
696     parser.add_argument('-l', '--low-bandwidth',
697         help='trade lower bandwidth usage for higher latency (reduced efficiency)',
698         action='store_true', default=False, dest='low_bandwidth')
699     parser.add_argument('--disable-upnp',
700         help='''don't attempt to forward port 9333 (19333 for testnet) from the WAN to this computer using UPnP''',
701         action='store_false', default=True, dest='upnp')
702     
703     worker_group = parser.add_argument_group('worker interface')
704     worker_group.add_argument('-w', '--worker-port', metavar='PORT',
705         help='listen on PORT for RPC connections from miners asking for work and providing responses (default: bitcoin: 9332 namecoin: 9331 ixcoin: 9330 i0coin: 9329, +10000 for testnets)',
706         type=int, action='store', default=None, dest='worker_port')
707     
708     bitcoind_group = parser.add_argument_group('bitcoind interface')
709     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
710         help='connect to a bitcoind at this address (default: 127.0.0.1)',
711         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
712     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
713         help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332, 8338 for ixcoin)',
714         type=int, action='store', default=None, dest='bitcoind_rpc_port')
715     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
716         help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 normally. 18333 for testnet)',
717         type=int, action='store', default=None, dest='bitcoind_p2p_port')
718     
719     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSER',
720         help='bitcoind RPC interface username (default: empty)',
721         type=str, action='store', default='', nargs='?', dest='bitcoind_rpc_username')
722     bitcoind_group.add_argument(metavar='BITCOIND_RPCPASSWORD',
723         help='bitcoind RPC interface password',
724         type=str, action='store', dest='bitcoind_rpc_password')
725     
726     args = parser.parse_args()
727     
728     if args.debug:
729         p2pool_init.DEBUG = True
730     
731     if args.logfile is None:
732        args.logfile = os.path.join(os.path.dirname(sys.argv[0]), args.net_name + ('_testnet' if args.testnet else '') + '.log')
733     
734     class ReopeningFile(object):
735         def __init__(self, *open_args, **open_kwargs):
736             self.open_args, self.open_kwargs = open_args, open_kwargs
737             self.inner_file = open(*self.open_args, **self.open_kwargs)
738         def reopen(self):
739             self.inner_file.close()
740             self.inner_file = open(*self.open_args, **self.open_kwargs)
741         def write(self, data):
742             self.inner_file.write(data)
743         def flush(self):
744             self.inner_file.flush()
745     class TeePipe(object):
746         def __init__(self, outputs):
747             self.outputs = outputs
748         def write(self, data):
749             for output in self.outputs:
750                 output.write(data)
751         def flush(self):
752             for output in self.outputs:
753                 output.flush()
754     class TimestampingPipe(object):
755         def __init__(self, inner_file):
756             self.inner_file = inner_file
757             self.buf = ''
758             self.softspace = 0
759         def write(self, data):
760             buf = self.buf + data
761             lines = buf.split('\n')
762             for line in lines[:-1]:
763                 self.inner_file.write('%s %s\n' % (datetime.datetime.now().strftime("%H:%M:%S.%f"), line))
764                 self.inner_file.flush()
765             self.buf = lines[-1]
766         def flush(self):
767             pass
768     logfile = ReopeningFile(args.logfile, 'w')
769     sys.stdout = sys.stderr = log.DefaultObserver.stderr = TimestampingPipe(TeePipe([sys.stderr, logfile]))
770     if hasattr(signal, "SIGUSR1"):
771         def sigusr1(signum, frame):
772             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
773             logfile.reopen()
774             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
775         signal.signal(signal.SIGUSR1, sigusr1)
776     
777     args.net = p2pool.nets[args.net_name + ('_testnet' if args.testnet else '')]
778     
779     if args.bitcoind_rpc_port is None:
780         args.bitcoind_rpc_port = args.net.BITCOIN_RPC_PORT
781     
782     if args.bitcoind_p2p_port is None:
783         args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
784     
785     if args.p2pool_port is None:
786         args.p2pool_port = args.net.P2P_PORT
787     
788     if args.worker_port is None:
789         args.worker_port = args.net.WORKER_PORT
790     
791     if args.address is not None:
792         try:
793             args.pubkey_hash = bitcoin.data.address_to_pubkey_hash(args.address, args.net)
794         except Exception, e:
795             raise ValueError('error parsing address: ' + repr(e))
796     else:
797         args.pubkey_hash = None
798     
799     reactor.callWhenRunning(main, args)
800     reactor.run()