fix bug where old share files weren't removed in some cases
[p2pool.git] / p2pool / main.py
1 #!/usr/bin/python
2
3 from __future__ import division
4
5 import argparse
6 import datetime
7 import itertools
8 import os
9 import random
10 import sqlite3
11 import struct
12 import sys
13 import time
14 import json
15 import signal
16 import traceback
17
18 from twisted.internet import defer, reactor, task, threads
19 from twisted.web import server, resource
20 from twisted.python import log
21 from nattraverso import portmapper, ipdiscover
22
23 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
24 from util import db, expiring_dict, jsonrpc, variable, deferral, math
25 from . import p2p, worker_interface, skiplists
26 import p2pool.data as p2pool
27 import p2pool as p2pool_init
28
29 @deferral.retry('Error getting work from bitcoind:', 3)
30 @defer.inlineCallbacks
31 def getwork(bitcoind, ht):
32     # a block could arrive in between these two queries
33     work = bitcoin.getwork.BlockAttempt.from_getwork((yield bitcoind.rpc_getwork()))
34     try:
35         height = ht.getHeight(work.previous_block)
36     except ValueError:
37         height = 1000 # XXX
38     defer.returnValue((work, height))
39
40 @deferral.retry('Error getting payout script from bitcoind:', 1)
41 @defer.inlineCallbacks
42 def get_payout_script(factory):
43     res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
44     if res['reply'] == 'success':
45         defer.returnValue(res['script'])
46     elif res['reply'] == 'denied':
47         defer.returnValue(None)
48     else:
49         raise ValueError('Unexpected reply: %r' % (res,))
50
51 @deferral.retry('Error creating payout script:', 10)
52 @defer.inlineCallbacks
53 def get_payout_script2(bitcoind, net):
54     defer.returnValue(bitcoin.data.pubkey_hash_to_script2(bitcoin.data.address_to_pubkey_hash((yield bitcoind.rpc_getaccountaddress('p2pool')), net)))
55
56 @defer.inlineCallbacks
57 def main(args):
58     try:
59         if args.charts:
60             from . import draw
61         
62         print 'p2pool (version %s)' % (p2pool_init.__version__,)
63         print
64         
65         # connect to bitcoind over JSON-RPC and do initial getwork
66         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
67         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
68         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
69         good = yield deferral.retry('Error while checking bitcoind identity:', 1)(args.net.BITCOIN_RPC_CHECK)(bitcoind)
70         if not good:
71             print "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
72             return
73         temp_work = yield deferral.retry('Error while testing getwork:', 1)(defer.inlineCallbacks(lambda: defer.returnValue(bitcoin.getwork.BlockAttempt.from_getwork((yield bitcoind.rpc_getwork())))))()
74         print '    ...success!'
75         print '    Current block hash: %x' % (temp_work.previous_block,)
76         print
77         
78         # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
79         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
80         factory = bitcoin.p2p.ClientFactory(args.net)
81         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
82         my_script = yield get_payout_script(factory)
83         if args.pubkey_hash is None:
84             if my_script is None:
85                 print '    IP transaction denied ... falling back to sending to address.'
86                 my_script = yield get_payout_script2(bitcoind, args.net)
87         else:
88             my_script = bitcoin.data.pubkey_hash_to_script2(args.pubkey_hash)
89         print '    ...success!'
90         print '    Payout script:', my_script.encode('hex')
91         print
92         
93         print 'Loading cached block headers...'
94         ht = bitcoin.p2p.HeightTracker(factory, args.net.NAME + '_headers.dat')
95         print '   ...done loading %i cached block headers.' % (len(ht.tracker.shares),)
96         print
97         
98         tracker = p2pool.OkayTracker(args.net)
99         ss = p2pool.ShareStore(os.path.join(os.path.dirname(sys.argv[0]), args.net.NAME + '_shares.'), args.net)
100         known_verified = set()
101         print "Loading shares..."
102         for i, (mode, contents) in enumerate(ss.get_shares()):
103             if mode == 'share':
104                 if contents.hash in tracker.shares:
105                     continue
106                 contents.shared = True
107                 contents.stored = True
108                 tracker.add(contents)
109                 if len(tracker.shares) % 1000 == 0 and tracker.shares:
110                     print "    %i" % (len(tracker.shares),)
111             elif mode == 'verified_hash':
112                 known_verified.add(contents)
113             else:
114                 raise AssertionError()
115         print "    ...inserting %i verified shares..." % (len(known_verified),)
116         for h in known_verified:
117             if h not in tracker.shares:
118                 ss.forget_verified_share(h)
119                 continue
120             tracker.verified.add(tracker.shares[h])
121         print "    ...done loading %i shares!" % (len(tracker.shares),)
122         print
123         tracker.added.watch(lambda share: ss.add_share(share))
124         tracker.verified.added.watch(lambda share: ss.add_verified_hash(share.hash))
125         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
126         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
127         
128         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
129         
130         # information affecting work that should trigger a long-polling update
131         current_work = variable.Variable(None)
132         # information affecting work that should not trigger a long-polling update
133         current_work2 = variable.Variable(None)
134         
135         work_updated = variable.Event()
136         
137         requested = expiring_dict.ExpiringDict(300)
138         
139         @defer.inlineCallbacks
140         def set_real_work1():
141             work, height = yield getwork(bitcoind, ht)
142             changed = work.previous_block != current_work.value['previous_block'] if current_work.value is not None else True
143             current_work.set(dict(
144                 version=work.version,
145                 previous_block=work.previous_block,
146                 target=work.target,
147                 height=height,
148                 best_share_hash=current_work.value['best_share_hash'] if current_work.value is not None else None,
149             ))
150             current_work2.set(dict(
151                 clock_offset=time.time() - work.timestamp,
152                 last_update=time.time(),
153             ))
154             if changed:
155                 set_real_work2()
156         
157         def set_real_work2():
158             best, desired = tracker.think(ht, current_work.value['previous_block'], time.time() - current_work2.value['clock_offset'])
159             
160             t = dict(current_work.value)
161             t['best_share_hash'] = best
162             current_work.set(t)
163             
164             t = time.time()
165             for peer2, share_hash in desired:
166                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
167                     continue
168                 last_request_time, count = requested.get(share_hash, (None, 0))
169                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
170                     continue
171                 potential_peers = set()
172                 for head in tracker.tails[share_hash]:
173                     potential_peers.update(peer_heads.get(head, set()))
174                 potential_peers = [peer for peer in potential_peers if peer.connected2]
175                 if count == 0 and peer2 is not None and peer2.connected2:
176                     peer = peer2
177                 else:
178                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
179                     if peer is None:
180                         continue
181                 
182                 print 'Requesting parent share %s from %s' % (p2pool.format_hash(share_hash), '%s:%i' % peer.addr)
183                 peer.send_getshares(
184                     hashes=[share_hash],
185                     parents=2000,
186                     stops=list(set(tracker.heads) | set(
187                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
188                     ))[:100],
189                 )
190                 requested[share_hash] = t, count + 1
191         
192         print 'Initializing work...'
193         yield set_real_work1()
194         set_real_work2()
195         print '    ...success!'
196         print
197         
198         start_time = time.time() - current_work2.value['clock_offset']
199         
200         # setup p2p logic and join p2pool network
201         
202         def share_share(share, ignore_peer=None):
203             for peer in p2p_node.peers.itervalues():
204                 if peer is ignore_peer:
205                     continue
206                 #if p2pool_init.DEBUG:
207                 #    print "Sending share %s to %r" % (p2pool.format_hash(share.hash), peer.addr)
208                 peer.send_shares([share])
209             share.flag_shared()
210         
211         def p2p_shares(shares, peer=None):
212             if len(shares) > 5:
213                 print 'Processing %i shares...' % (len(shares),)
214             
215             some_new = False
216             for share in shares:
217                 if share.hash in tracker.shares:
218                     #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool.format_hash(share.hash),)
219                     continue
220                 some_new = True
221                 
222                 #print 'Received share %s from %r' % (p2pool.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
223                 
224                 tracker.add(share)
225                 #for peer2, share_hash in desired:
226                 #    print 'Requesting parent share %x' % (share_hash,)
227                 #    peer2.send_getshares(hashes=[share_hash], parents=2000)
228                 
229                 if share.bitcoin_hash <= share.header['target']:
230                     print
231                     print 'GOT BLOCK! Passing to bitcoind! %s bitcoin: %x' % (p2pool.format_hash(share.hash), share.bitcoin_hash,)
232                     print
233                     if factory.conn.value is not None:
234                         factory.conn.value.send_block(block=share.as_block(tracker, args.net))
235                     else:
236                         print 'No bitcoind connection! Erp!'
237             
238             if shares and peer is not None:
239                 peer_heads.setdefault(shares[0].hash, set()).add(peer)
240             
241             if some_new:
242                 set_real_work2()
243             
244             if len(shares) > 5:
245                 print '... done processing %i shares. Have: %i/~%i' % (len(shares), len(tracker.shares), 2*args.net.CHAIN_LENGTH)
246         
247         def p2p_share_hashes(share_hashes, peer):
248             t = time.time()
249             get_hashes = []
250             for share_hash in share_hashes:
251                 if share_hash in tracker.shares:
252                     continue
253                 last_request_time, count = requested.get(share_hash, (None, 0))
254                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
255                     continue
256                 print 'Got share hash, requesting! Hash: %s' % (p2pool.format_hash(share_hash),)
257                 get_hashes.append(share_hash)
258                 requested[share_hash] = t, count + 1
259             
260             if share_hashes and peer is not None:
261                 peer_heads.setdefault(share_hashes[0], set()).add(peer)
262             if get_hashes:
263                 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
264         
265         def p2p_get_shares(share_hashes, parents, stops, peer):
266             parents = min(parents, 1000//len(share_hashes))
267             stops = set(stops)
268             shares = []
269             for share_hash in share_hashes:
270                 for share in itertools.islice(tracker.get_chain_known(share_hash), parents + 1):
271                     if share.hash in stops:
272                         break
273                     shares.append(share)
274             print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
275             peer.send_shares(shares, full=True)
276         
277         print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
278         
279         def parse(x):
280             if ':' in x:
281                 ip, port = x.split(':')
282                 return ip, int(port)
283             else:
284                 return x, args.net.P2P_PORT
285         
286         nodes = set([
287             ('72.14.191.28', args.net.P2P_PORT),
288             ('62.204.197.159', args.net.P2P_PORT),
289             ('142.58.248.28', args.net.P2P_PORT),
290             ('94.23.34.145', args.net.P2P_PORT),
291         ])
292         for host in [
293             'p2pool.forre.st',
294             'dabuttonfactory.com',
295         ]:
296             try:
297                 nodes.add(((yield reactor.resolve(host)), args.net.P2P_PORT))
298             except:
299                 log.err(None, 'Error resolving bootstrap node IP:')
300         
301         p2p_node = p2p.Node(
302             current_work=current_work,
303             port=args.p2pool_port,
304             net=args.net,
305             addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.NAME),
306             mode=0 if args.low_bandwidth else 1,
307             preferred_addrs=set(map(parse, args.p2pool_nodes)) | nodes,
308         )
309         p2p_node.handle_shares = p2p_shares
310         p2p_node.handle_share_hashes = p2p_share_hashes
311         p2p_node.handle_get_shares = p2p_get_shares
312         
313         p2p_node.start()
314         
315         # send share when the chain changes to their chain
316         def work_changed(new_work):
317             #print 'Work changed:', new_work
318             for share in tracker.get_chain_known(new_work['best_share_hash']):
319                 if share.shared:
320                     break
321                 share_share(share, share.peer)
322         current_work.changed.watch(work_changed)
323         
324         print '    ...success!'
325         print
326         
327         @defer.inlineCallbacks
328         def upnp_thread():
329             while True:
330                 try:
331                     is_lan, lan_ip = yield ipdiscover.get_local_ip()
332                     if is_lan:
333                         pm = yield portmapper.get_port_mapper()
334                         yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP') # XXX try to forward external correct port?
335                 except:
336                     if p2pool_init.DEBUG:
337                         log.err(None, "UPnP error:")
338                 yield deferral.sleep(random.expovariate(1/120))
339         
340         if args.upnp:
341             upnp_thread()
342         
343         # start listening for workers with a JSON-RPC server
344         
345         print 'Listening for workers on port %i...' % (args.worker_port,)
346         
347         # setup worker logic
348         
349         merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
350         run_identifier = struct.pack('<Q', random.randrange(2**64))
351         
352         share_counter = skiplists.CountsSkipList(tracker, run_identifier)
353         removed_unstales = set()
354         def get_share_counts():
355             height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
356             matching_in_chain = share_counter(current_work.value['best_share_hash'], height) | removed_unstales
357             shares_in_chain = my_shares & matching_in_chain
358             stale_shares = my_shares - matching_in_chain
359             return len(shares_in_chain) + len(stale_shares), len(stale_shares)
360         @tracker.verified.removed.watch
361         def _(share):
362             if share.hash in my_shares and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
363                 removed_unstales.add(share.hash)
364         
365         def compute(state, payout_script):
366             if payout_script is None:
367                 payout_script = my_script
368             if state['best_share_hash'] is None and args.net.PERSIST:
369                 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
370             if len(p2p_node.peers) == 0 and args.net.PERSIST:
371                 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
372             if time.time() > current_work2.value['last_update'] + 60:
373                 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
374             pre_extra_txs = [tx for tx in tx_pool.itervalues() if tx.is_good()]
375             pre_extra_txs = pre_extra_txs[:2**16 - 1] # merkle_branch limit
376             extra_txs = []
377             size = 0
378             for tx in pre_extra_txs:
379                 this_size = len(bitcoin.data.tx_type.pack(tx.tx))
380                 if size + this_size > 500000:
381                     break
382                 extra_txs.append(tx)
383                 size += this_size
384             # XXX check sigops!
385             # XXX assuming generate_tx is smallish here..
386             def get_stale_frac():
387                 shares, stale_shares = get_share_counts()
388                 if shares == 0:
389                     return ""
390                 frac = stale_shares/shares
391                 return 2*struct.pack('<H', int(65535*frac + .5))
392             generate_tx = p2pool.generate_transaction(
393                 tracker=tracker,
394                 previous_share_hash=state['best_share_hash'],
395                 new_script=payout_script,
396                 subsidy=args.net.BITCOIN_SUBSIDY_FUNC(state['height']) + sum(tx.value_in - tx.value_out for tx in extra_txs),
397                 nonce=run_identifier + struct.pack('<Q', random.randrange(2**64)) + get_stale_frac(),
398                 block_target=state['target'],
399                 net=args.net,
400             )
401             print 'New work for worker! Difficulty: %.06f Payout if block: %.6f %s' % (0xffff*2**208/p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'], generate_tx['tx_outs'][-1]['value']*1e-8, args.net.BITCOIN_SYMBOL)
402             #print 'Target: %x' % (p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'],)
403             #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
404             transactions = [generate_tx] + [tx.tx for tx in extra_txs]
405             merkle_root = bitcoin.data.merkle_hash(transactions)
406             merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
407             
408             timestamp = int(time.time() - current_work2.value['clock_offset'])
409             if state['best_share_hash'] is not None:
410                 timestamp2 = math.median((s.timestamp for s in itertools.islice(tracker.get_chain_to_root(state['best_share_hash']), 11)), use_float=False) + 1
411                 if timestamp2 > timestamp:
412                     print 'Toff', timestamp2 - timestamp
413                     timestamp = timestamp2
414             target2 = p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
415             times[p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['nonce']] = time.time()
416             #print 'SENT', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
417             return bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'], target2)
418         
419         my_shares = set()
420         times = {}
421         
422         def got_response(data, user):
423             try:
424                 # match up with transactions
425                 header = bitcoin.getwork.decode_data(data)
426                 transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
427                 if transactions is None:
428                     print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
429                     return False
430                 block = dict(header=header, txs=transactions)
431                 hash_ = bitcoin.data.block_header_type.hash256(block['header'])
432                 if hash_ <= block['header']['target'] or p2pool_init.DEBUG:
433                     if factory.conn.value is not None:
434                         factory.conn.value.send_block(block=block)
435                     else:
436                         print 'No bitcoind connection! Erp!'
437                     if hash_ <= block['header']['target']:
438                         print
439                         print 'GOT BLOCK! Passing to bitcoind! bitcoin: %x' % (hash_,)
440                         print
441                 target = p2pool.coinbase_type.unpack(transactions[0]['tx_ins'][0]['script'])['share_data']['target']
442                 if hash_ > target:
443                     print 'Worker submitted share with hash > target:\nhash  : %x\ntarget: %x' % (hash_, target)
444                     return False
445                 share = p2pool.Share.from_block(block)
446                 my_shares.add(share.hash)
447                 print 'GOT SHARE! %s %s prev %s age %.2fs' % (user, p2pool.format_hash(share.hash), p2pool.format_hash(share.previous_hash), time.time() - times[share.nonce]) + (' DEAD ON ARRIVAL' if share.previous_hash != current_work.value['best_share_hash'] else '')
448                 good = share.previous_hash == current_work.value['best_share_hash']
449                 # maybe revert back to tracker being non-blocking so 'good' can be more accurate?
450                 p2p_shares([share])
451                 # eg. good = share.hash == current_work.value['best_share_hash'] here
452                 return good
453             except:
454                 log.err(None, 'Error processing data received from worker:')
455                 return False
456         
457         web_root = worker_interface.WorkerInterface(current_work, compute, got_response, args.net)
458         
459         def get_rate():
460             if current_work.value['best_share_hash'] is not None:
461                 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
462                 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net, min(height, 720))
463                 return json.dumps(att_s)
464             return json.dumps(None)
465         
466         def get_users():
467             height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
468             weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 2**256)
469             res = {}
470             for script in sorted(weights, key=lambda s: weights[s]):
471                 res[bitcoin.data.script2_to_human(script, args.net)] = weights[script]/total_weight
472             return json.dumps(res)
473         
474         class WebInterface(resource.Resource):
475             def __init__(self, func, mime_type):
476                 self.func, self.mime_type = func, mime_type
477             
478             def render_GET(self, request):
479                 request.setHeader('Content-Type', self.mime_type)
480                 return self.func()
481         
482         web_root.putChild('rate', WebInterface(get_rate, 'application/json'))
483         web_root.putChild('users', WebInterface(get_users, 'application/json'))
484         if args.charts:
485             web_root.putChild('chain_img', WebInterface(lambda: draw.get(tracker, current_work.value['best_share_hash']), 'image/png'))
486         
487         reactor.listenTCP(args.worker_port, server.Site(web_root))
488         
489         print '    ...success!'
490         print
491         
492         # done!
493         
494         tx_pool = expiring_dict.ExpiringDict(600, get_touches=False) # hash -> tx
495         get_raw_transaction = deferral.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
496         
497         class Tx(object):
498             def __init__(self, tx, seen_at_block):
499                 self.hash = bitcoin.data.tx_type.hash256(tx)
500                 self.tx = tx
501                 self.seen_at_block = seen_at_block
502                 self.mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
503                 #print
504                 #print '%x %r' % (seen_at_block, tx)
505                 #for mention in self.mentions:
506                 #    print '%x' % mention
507                 #print
508                 self.parents_all_in_blocks = False
509                 self.value_in = 0
510                 #print self.tx
511                 self.value_out = sum(txout['value'] for txout in self.tx['tx_outs'])
512                 self._find_parents_in_blocks()
513             
514             @defer.inlineCallbacks
515             def _find_parents_in_blocks(self):
516                 for tx_in in self.tx['tx_ins']:
517                     try:
518                         raw_transaction = yield get_raw_transaction(tx_in['previous_output']['hash'])
519                     except Exception:
520                         return
521                     self.value_in += raw_transaction['tx']['txouts'][tx_in['previous_output']['index']]['value']
522                     #print raw_transaction
523                     if not raw_transaction['parent_blocks']:
524                         return
525                 self.parents_all_in_blocks = True
526             
527             def is_good(self):
528                 if not self.parents_all_in_blocks:
529                     return False
530                 x = self.is_good2()
531                 #print 'is_good:', x
532                 return x
533         
534         @defer.inlineCallbacks
535         def new_tx(tx_hash):
536             try:
537                 assert isinstance(tx_hash, (int, long))
538                 #print 'REQUESTING', tx_hash
539                 tx = yield (yield factory.getProtocol()).get_tx(tx_hash)
540                 #print 'GOT', tx
541                 tx_pool[bitcoin.data.tx_type.hash256(tx)] = Tx(tx, current_work.value['previous_block'])
542             except:
543                 log.err(None, 'Error handling tx:')
544         # disable for now, for testing impact on stales
545         #factory.new_tx.watch(new_tx)
546         
547         def new_block(block_hash):
548             work_updated.happened()
549         factory.new_block.watch(new_block)
550         
551         print 'Started successfully!'
552         print
553         
554         ht.updated.watch(set_real_work2)
555         
556         @defer.inlineCallbacks
557         def work1_thread():
558             while True:
559                 flag = work_updated.get_deferred()
560                 try:
561                     yield set_real_work1()
562                 except:
563                     log.err()
564                 yield defer.DeferredList([flag, deferral.sleep(random.uniform(1, 10))], fireOnOneCallback=True)
565         
566         @defer.inlineCallbacks
567         def work2_thread():
568             while True:
569                 try:
570                     set_real_work2()
571                 except:
572                     log.err()
573                 yield deferral.sleep(random.expovariate(1/20))
574         
575         work1_thread()
576         work2_thread()
577         
578         
579         if hasattr(signal, 'SIGALRM'):
580             def watchdog_handler(signum, frame):
581                 print 'Watchdog timer went off at:'
582                 traceback.print_stack()
583             
584             signal.signal(signal.SIGALRM, watchdog_handler)
585             task.LoopingCall(signal.alarm, 30).start(1)
586         
587         
588         def read_stale_frac(share):
589             if len(share.nonce) != 20:
590                 return None
591             a, b = struct.unpack("<HH", share.nonce[-4:])
592             if a != b:
593                 return None
594             return a/65535
595         
596         while True:
597             yield deferral.sleep(3)
598             try:
599                 if time.time() > current_work2.value['last_update'] + 60:
600                     print '''LOST CONTACT WITH BITCOIND for 60 seconds, check that it isn't frozen or dead'''
601                 if current_work.value['best_share_hash'] is not None:
602                     height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
603                     if height > 2:
604                         att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net, min(height - 1, 120))
605                         weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 120), 2**100)
606                         shares, stale_shares = get_share_counts()
607                         print 'Pool: %sH/s in %i shares (%i/%i verified) Recent: %.02f%% >%sH/s Shares: %i (%i stale) Peers: %i' % (
608                             math.format(att_s),
609                             height,
610                             len(tracker.verified.shares),
611                             len(tracker.shares),
612                             weights.get(my_script, 0)/total_weight*100,
613                             math.format(weights.get(my_script, 0)/total_weight*att_s),
614                             shares,
615                             stale_shares,
616                             len(p2p_node.peers),
617                         ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool_init.DEBUG else '')
618                         fracs = [read_stale_frac(share) for share in itertools.islice(tracker.get_chain_known(current_work.value['best_share_hash']), 120) if read_stale_frac(share) is not None]
619                         if fracs:
620                             med = math.median(fracs)
621                             print 'Median stale proportion:', med
622                             if shares:
623                                 print '    Own:', stale_shares/shares
624                                 if med < .99:
625                                     print '    Own efficiency: %.02f%%' % (100*(1 - stale_shares/shares)/(1 - med),)
626                             
627                             
628             except:
629                 log.err()
630     except:
631         log.err(None, 'Fatal error:')
632     finally:
633         reactor.stop()
634
635 def run():
636     parser = argparse.ArgumentParser(description='p2pool (version %s)' % (p2pool_init.__version__,), fromfile_prefix_chars='@')
637     parser.convert_arg_line_to_args = lambda arg_line: (arg for arg in arg_line.split() if arg.strip())
638     parser.add_argument('--version', action='version', version=p2pool_init.__version__)
639     parser.add_argument('--net',
640         help='use specified network (default: bitcoin)',
641         action='store', choices=sorted(x for x in p2pool.nets if 'testnet' not in x), default='bitcoin', dest='net_name')
642     parser.add_argument('--testnet',
643         help='''use the network's testnet''',
644         action='store_const', const=True, default=False, dest='testnet')
645     parser.add_argument('--debug',
646         help='debugging mode',
647         action='store_const', const=True, default=False, dest='debug')
648     parser.add_argument('-a', '--address',
649         help='generate to this address (defaults to requesting one from bitcoind)',
650         type=str, action='store', default=None, dest='address')
651     parser.add_argument('--charts',
652         help='generate charts on the web interface (requires PIL and pygame)',
653         action='store_const', const=True, default=False, dest='charts')
654     parser.add_argument('--logfile',
655         help='log to specific file (defaults to <network_name>.log)',
656         type=str, action='store', default=None, dest='logfile')
657     
658     p2pool_group = parser.add_argument_group('p2pool interface')
659     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
660         help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
661         type=int, action='store', default=None, dest='p2pool_port')
662     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
663         help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
664         type=str, action='append', default=[], dest='p2pool_nodes')
665     parser.add_argument('-l', '--low-bandwidth',
666         help='trade lower bandwidth usage for higher latency (reduced efficiency)',
667         action='store_true', default=False, dest='low_bandwidth')
668     parser.add_argument('--disable-upnp',
669         help='''don't attempt to forward port 9333 (19333 for testnet) from the WAN to this computer using UPnP''',
670         action='store_false', default=True, dest='upnp')
671     
672     worker_group = parser.add_argument_group('worker interface')
673     worker_group.add_argument('-w', '--worker-port', metavar='PORT',
674         help='listen on PORT for RPC connections from miners asking for work and providing responses (default: bitcoin: 9332 namecoin: 9331 ixcoin: 9330 i0coin: 9329, +10000 for testnets)',
675         type=int, action='store', default=None, dest='worker_port')
676     
677     bitcoind_group = parser.add_argument_group('bitcoind interface')
678     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
679         help='connect to a bitcoind at this address (default: 127.0.0.1)',
680         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
681     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
682         help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332, 8338 for ixcoin)',
683         type=int, action='store', default=None, dest='bitcoind_rpc_port')
684     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
685         help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 normally. 18333 for testnet)',
686         type=int, action='store', default=None, dest='bitcoind_p2p_port')
687     
688     bitcoind_group.add_argument(metavar='BITCOIND_RPC_USERNAME',
689         help='bitcoind RPC interface username',
690         type=str, action='store', dest='bitcoind_rpc_username')
691     bitcoind_group.add_argument(metavar='BITCOIND_RPC_PASSWORD',
692         help='bitcoind RPC interface password',
693         type=str, action='store', dest='bitcoind_rpc_password')
694     
695     args = parser.parse_args()
696     
697     if args.debug:
698         p2pool_init.DEBUG = True
699     
700     if args.logfile is None:
701        args.logfile = args.net_name + ('_testnet' if args.testnet else '') + '.log'
702     
703     class ReopeningFile(object):
704         def __init__(self, *open_args, **open_kwargs):
705             self.open_args, self.open_kwargs = open_args, open_kwargs
706             self.inner_file = open(*self.open_args, **self.open_kwargs)
707         def reopen(self):
708             self.inner_file.close()
709             self.inner_file = open(*self.open_args, **self.open_kwargs)
710         def write(self, data):
711             self.inner_file.write(data)
712         def flush(self):
713             self.inner_file.flush()
714     class TeePipe(object):
715         def __init__(self, outputs):
716             self.outputs = outputs
717         def write(self, data):
718             for output in self.outputs:
719                 output.write(data)
720         def flush(self):
721             for output in self.outputs:
722                 output.flush()
723     class TimestampingPipe(object):
724         def __init__(self, inner_file):
725             self.inner_file = inner_file
726             self.buf = ''
727             self.softspace = 0
728         def write(self, data):
729             buf = self.buf + data
730             lines = buf.split('\n')
731             for line in lines[:-1]:
732                 self.inner_file.write('%s %s\n' % (datetime.datetime.now().strftime("%H:%M:%S.%f"), line))
733                 self.inner_file.flush()
734             self.buf = lines[-1]
735         def flush(self):
736             pass
737     logfile = ReopeningFile(os.path.join(os.path.dirname(sys.argv[0]), 'debug.log'), 'w')
738     sys.stdout = sys.stderr = log.DefaultObserver.stderr = TimestampingPipe(TeePipe([sys.stderr, logfile]))
739     if hasattr(signal, "SIGUSR1"):
740         def sigusr1(signum, frame):
741             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
742             logfile.reopen()
743             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
744         signal.signal(signal.SIGUSR1, sigusr1)
745     
746     args.net = p2pool.nets[args.net_name + ('_testnet' if args.testnet else '')]
747     
748     if args.bitcoind_rpc_port is None:
749         args.bitcoind_rpc_port = args.net.BITCOIN_RPC_PORT
750     
751     if args.bitcoind_p2p_port is None:
752         args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
753     
754     if args.p2pool_port is None:
755         args.p2pool_port = args.net.P2P_PORT
756     
757     if args.worker_port is None:
758         args.worker_port = args.net.WORKER_PORT
759     
760     if args.address is not None:
761         try:
762             args.pubkey_hash = bitcoin.data.address_to_pubkey_hash(args.address, args.net)
763         except Exception, e:
764             raise ValueError('error parsing address: ' + repr(e))
765     else:
766         args.pubkey_hash = None
767     
768     reactor.callWhenRunning(main, args)
769     reactor.run()