fix handling of empty arguments
[p2pool.git] / p2pool / main.py
1 #!/usr/bin/python
2
3 from __future__ import division
4
5 import argparse
6 import datetime
7 import itertools
8 import os
9 import random
10 import sqlite3
11 import struct
12 import sys
13 import time
14 import json
15 import signal
16 import traceback
17
18 from twisted.internet import defer, reactor, task, threads
19 from twisted.web import server, resource
20 from twisted.python import log
21 from nattraverso import portmapper, ipdiscover
22
23 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
24 from util import db, expiring_dict, jsonrpc, variable, deferral, math
25 from . import p2p, worker_interface, skiplists
26 import p2pool.data as p2pool
27 import p2pool as p2pool_init
28
29 @deferral.retry('Error getting work from bitcoind:', 3)
30 @defer.inlineCallbacks
31 def getwork(bitcoind, ht):
32     # a block could arrive in between these two queries
33     work = bitcoin.getwork.BlockAttempt.from_getwork((yield bitcoind.rpc_getwork()))
34     try:
35         height = ht.getHeight(work.previous_block)
36     except ValueError:
37         height = 1000 # XXX
38     defer.returnValue((work, height))
39
40 @deferral.retry('Error getting payout script from bitcoind:', 1)
41 @defer.inlineCallbacks
42 def get_payout_script(factory):
43     res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
44     if res['reply'] == 'success':
45         defer.returnValue(res['script'])
46     elif res['reply'] == 'denied':
47         defer.returnValue(None)
48     else:
49         raise ValueError('Unexpected reply: %r' % (res,))
50
51 @deferral.retry('Error creating payout script:', 10)
52 @defer.inlineCallbacks
53 def get_payout_script2(bitcoind, net):
54     defer.returnValue(bitcoin.data.pubkey_hash_to_script2(bitcoin.data.address_to_pubkey_hash((yield bitcoind.rpc_getaccountaddress('p2pool')), net)))
55
56 @defer.inlineCallbacks
57 def main(args):
58     try:
59         if args.charts:
60             from . import draw
61         
62         print 'p2pool (version %s)' % (p2pool_init.__version__,)
63         print
64         
65         # connect to bitcoind over JSON-RPC and do initial getwork
66         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
67         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
68         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
69         good = yield deferral.retry('Error while checking bitcoind identity:', 1)(args.net.BITCOIN_RPC_CHECK)(bitcoind)
70         if not good:
71             print "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
72             return
73         temp_work = yield deferral.retry('Error while testing getwork:', 1)(defer.inlineCallbacks(lambda: defer.returnValue(bitcoin.getwork.BlockAttempt.from_getwork((yield bitcoind.rpc_getwork())))))()
74         print '    ...success!'
75         print '    Current block hash: %x' % (temp_work.previous_block,)
76         print
77         
78         # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
79         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
80         factory = bitcoin.p2p.ClientFactory(args.net)
81         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
82         my_script = yield get_payout_script(factory)
83         if args.pubkey_hash is None:
84             if my_script is None:
85                 print '    IP transaction denied ... falling back to sending to address.'
86                 my_script = yield get_payout_script2(bitcoind, args.net)
87         else:
88             my_script = bitcoin.data.pubkey_hash_to_script2(args.pubkey_hash)
89         print '    ...success!'
90         print '    Payout script:', my_script.encode('hex')
91         print
92         
93         print 'Loading cached block headers...'
94         ht = bitcoin.p2p.HeightTracker(factory, args.net.NAME + '_headers.dat')
95         print '   ...done loading %i cached block headers.' % (len(ht.tracker.shares),)
96         print
97         
98         tracker = p2pool.OkayTracker(args.net)
99         ss = p2pool.ShareStore(os.path.join(os.path.dirname(sys.argv[0]), args.net.NAME + '_shares.'), args.net)
100         known_verified = set()
101         print "Loading shares..."
102         for i, (mode, contents) in enumerate(ss.get_shares()):
103             if mode == 'share':
104                 if contents.hash in tracker.shares:
105                     continue
106                 contents.shared = True
107                 contents.stored = True
108                 tracker.add(contents)
109                 if len(tracker.shares) % 1000 == 0 and tracker.shares:
110                     print "    %i" % (len(tracker.shares),)
111             elif mode == 'verified_hash':
112                 known_verified.add(contents)
113             else:
114                 raise AssertionError()
115         print "    ...inserting %i verified shares..." % (len(known_verified),)
116         for h in known_verified:
117             if h not in tracker.shares:
118                 ss.forget_verified_share(h)
119                 continue
120             tracker.verified.add(tracker.shares[h])
121         print "    ...done loading %i shares!" % (len(tracker.shares),)
122         print
123         tracker.added.watch(lambda share: ss.add_share(share))
124         tracker.verified.added.watch(lambda share: ss.add_verified_hash(share.hash))
125         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
126         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
127         
128         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
129         
130         # information affecting work that should trigger a long-polling update
131         current_work = variable.Variable(None)
132         # information affecting work that should not trigger a long-polling update
133         current_work2 = variable.Variable(None)
134         
135         work_updated = variable.Event()
136         
137         requested = expiring_dict.ExpiringDict(300)
138         
139         @defer.inlineCallbacks
140         def set_real_work1():
141             work, height = yield getwork(bitcoind, ht)
142             changed = work.previous_block != current_work.value['previous_block'] if current_work.value is not None else True
143             current_work.set(dict(
144                 version=work.version,
145                 previous_block=work.previous_block,
146                 target=work.target,
147                 height=height,
148                 best_share_hash=current_work.value['best_share_hash'] if current_work.value is not None else None,
149             ))
150             current_work2.set(dict(
151                 clock_offset=time.time() - work.timestamp,
152                 last_update=time.time(),
153             ))
154             if changed:
155                 set_real_work2()
156         
157         def set_real_work2():
158             best, desired = tracker.think(ht, current_work.value['previous_block'], time.time() - current_work2.value['clock_offset'])
159             
160             t = dict(current_work.value)
161             t['best_share_hash'] = best
162             current_work.set(t)
163             
164             t = time.time()
165             for peer2, share_hash in desired:
166                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
167                     continue
168                 last_request_time, count = requested.get(share_hash, (None, 0))
169                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
170                     continue
171                 potential_peers = set()
172                 for head in tracker.tails[share_hash]:
173                     potential_peers.update(peer_heads.get(head, set()))
174                 potential_peers = [peer for peer in potential_peers if peer.connected2]
175                 if count == 0 and peer2 is not None and peer2.connected2:
176                     peer = peer2
177                 else:
178                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
179                     if peer is None:
180                         continue
181                 
182                 print 'Requesting parent share %s from %s' % (p2pool.format_hash(share_hash), '%s:%i' % peer.addr)
183                 peer.send_getshares(
184                     hashes=[share_hash],
185                     parents=2000,
186                     stops=list(set(tracker.heads) | set(
187                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
188                     ))[:100],
189                 )
190                 requested[share_hash] = t, count + 1
191         
192         print 'Initializing work...'
193         yield set_real_work1()
194         set_real_work2()
195         print '    ...success!'
196         print
197         
198         start_time = time.time() - current_work2.value['clock_offset']
199         
200         # setup p2p logic and join p2pool network
201         
202         def share_share(share, ignore_peer=None):
203             for peer in p2p_node.peers.itervalues():
204                 if peer is ignore_peer:
205                     continue
206                 #if p2pool_init.DEBUG:
207                 #    print "Sending share %s to %r" % (p2pool.format_hash(share.hash), peer.addr)
208                 peer.send_shares([share])
209             share.flag_shared()
210         
211         def p2p_shares(shares, peer=None):
212             if len(shares) > 5:
213                 print 'Processing %i shares...' % (len(shares),)
214             
215             some_new = False
216             for share in shares:
217                 if share.hash in tracker.shares:
218                     #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool.format_hash(share.hash),)
219                     continue
220                 some_new = True
221                 
222                 #print 'Received share %s from %r' % (p2pool.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
223                 
224                 tracker.add(share)
225                 #for peer2, share_hash in desired:
226                 #    print 'Requesting parent share %x' % (share_hash,)
227                 #    peer2.send_getshares(hashes=[share_hash], parents=2000)
228                 
229                 if share.bitcoin_hash <= share.header['target']:
230                     print
231                     print 'GOT BLOCK! Passing to bitcoind! %s bitcoin: %x' % (p2pool.format_hash(share.hash), share.bitcoin_hash,)
232                     print
233                     if factory.conn.value is not None:
234                         factory.conn.value.send_block(block=share.as_block(tracker, args.net))
235                     else:
236                         print 'No bitcoind connection! Erp!'
237             
238             if shares and peer is not None:
239                 peer_heads.setdefault(shares[0].hash, set()).add(peer)
240             
241             if some_new:
242                 set_real_work2()
243             
244             if len(shares) > 5:
245                 print '... done processing %i shares. Have: %i/~%i' % (len(shares), len(tracker.shares), 2*args.net.CHAIN_LENGTH)
246         
247         def p2p_share_hashes(share_hashes, peer):
248             t = time.time()
249             get_hashes = []
250             for share_hash in share_hashes:
251                 if share_hash in tracker.shares:
252                     continue
253                 last_request_time, count = requested.get(share_hash, (None, 0))
254                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
255                     continue
256                 print 'Got share hash, requesting! Hash: %s' % (p2pool.format_hash(share_hash),)
257                 get_hashes.append(share_hash)
258                 requested[share_hash] = t, count + 1
259             
260             if share_hashes and peer is not None:
261                 peer_heads.setdefault(share_hashes[0], set()).add(peer)
262             if get_hashes:
263                 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
264         
265         def p2p_get_shares(share_hashes, parents, stops, peer):
266             parents = min(parents, 1000//len(share_hashes))
267             stops = set(stops)
268             shares = []
269             for share_hash in share_hashes:
270                 for share in itertools.islice(tracker.get_chain_known(share_hash), parents + 1):
271                     if share.hash in stops:
272                         break
273                     shares.append(share)
274             print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
275             peer.send_shares(shares, full=True)
276         
277         print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
278         
279         def parse(x):
280             if ':' in x:
281                 ip, port = x.split(':')
282                 return ip, int(port)
283             else:
284                 return x, args.net.P2P_PORT
285         
286         nodes = set([
287             ('72.14.191.28', args.net.P2P_PORT),
288             ('62.204.197.159', args.net.P2P_PORT),
289             ('142.58.248.28', args.net.P2P_PORT),
290             ('94.23.34.145', args.net.P2P_PORT),
291         ])
292         for host in [
293             'p2pool.forre.st',
294             'dabuttonfactory.com',
295         ]:
296             try:
297                 nodes.add(((yield reactor.resolve(host)), args.net.P2P_PORT))
298             except:
299                 log.err(None, 'Error resolving bootstrap node IP:')
300         
301         p2p_node = p2p.Node(
302             current_work=current_work,
303             port=args.p2pool_port,
304             net=args.net,
305             addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.NAME),
306             mode=0 if args.low_bandwidth else 1,
307             preferred_addrs=set(map(parse, args.p2pool_nodes)) | nodes,
308         )
309         p2p_node.handle_shares = p2p_shares
310         p2p_node.handle_share_hashes = p2p_share_hashes
311         p2p_node.handle_get_shares = p2p_get_shares
312         
313         p2p_node.start()
314         
315         # send share when the chain changes to their chain
316         def work_changed(new_work):
317             #print 'Work changed:', new_work
318             for share in tracker.get_chain_known(new_work['best_share_hash']):
319                 if share.shared:
320                     break
321                 share_share(share, share.peer)
322         current_work.changed.watch(work_changed)
323         
324         print '    ...success!'
325         print
326         
327         @defer.inlineCallbacks
328         def upnp_thread():
329             while True:
330                 try:
331                     is_lan, lan_ip = yield ipdiscover.get_local_ip()
332                     if is_lan:
333                         pm = yield portmapper.get_port_mapper()
334                         yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP') # XXX try to forward external correct port?
335                 except:
336                     if p2pool_init.DEBUG:
337                         log.err(None, "UPnP error:")
338                 yield deferral.sleep(random.expovariate(1/120))
339         
340         if args.upnp:
341             upnp_thread()
342         
343         # start listening for workers with a JSON-RPC server
344         
345         print 'Listening for workers on port %i...' % (args.worker_port,)
346         
347         # setup worker logic
348         
349         merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
350         run_identifier = struct.pack('<Q', random.randrange(2**64))
351         
352         share_counter = skiplists.CountsSkipList(tracker, run_identifier)
353         removed_unstales = set()
354         def get_share_counts():
355             height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
356             matching_in_chain = share_counter(current_work.value['best_share_hash'], height) | removed_unstales
357             shares_in_chain = my_shares & matching_in_chain
358             stale_shares = my_shares - matching_in_chain
359             return len(shares_in_chain) + len(stale_shares), len(stale_shares)
360         @tracker.verified.removed.watch
361         def _(share):
362             if share.hash in my_shares and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
363                 removed_unstales.add(share.hash)
364         
365         def compute(state, payout_script):
366             if payout_script is None:
367                 payout_script = my_script
368             if state['best_share_hash'] is None and args.net.PERSIST:
369                 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
370             if len(p2p_node.peers) == 0 and args.net.PERSIST:
371                 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
372             if time.time() > current_work2.value['last_update'] + 60:
373                 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
374             pre_extra_txs = [tx for tx in tx_pool.itervalues() if tx.is_good()]
375             pre_extra_txs = pre_extra_txs[:2**16 - 1] # merkle_branch limit
376             extra_txs = []
377             size = 0
378             for tx in pre_extra_txs:
379                 this_size = len(bitcoin.data.tx_type.pack(tx.tx))
380                 if size + this_size > 500000:
381                     break
382                 extra_txs.append(tx)
383                 size += this_size
384             # XXX check sigops!
385             # XXX assuming generate_tx is smallish here..
386             def get_stale_frac():
387                 shares, stale_shares = get_share_counts()
388                 if shares == 0:
389                     return ""
390                 frac = stale_shares/shares
391                 return 2*struct.pack('<H', int(65535*frac + .5))
392             generate_tx = p2pool.generate_transaction(
393                 tracker=tracker,
394                 previous_share_hash=state['best_share_hash'],
395                 new_script=payout_script,
396                 subsidy=args.net.BITCOIN_SUBSIDY_FUNC(state['height']) + sum(tx.value_in - tx.value_out for tx in extra_txs),
397                 nonce=run_identifier + struct.pack('<Q', random.randrange(2**64)) + get_stale_frac(),
398                 block_target=state['target'],
399                 net=args.net,
400             )
401             print 'New work for worker! Difficulty: %.06f Payout if block: %.6f %s' % (0xffff*2**208/p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'], generate_tx['tx_outs'][-1]['value']*1e-8, args.net.BITCOIN_SYMBOL)
402             #print 'Target: %x' % (p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'],)
403             #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
404             transactions = [generate_tx] + [tx.tx for tx in extra_txs]
405             merkle_root = bitcoin.data.merkle_hash(transactions)
406             merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
407             
408             timestamp = int(time.time() - current_work2.value['clock_offset'])
409             if state['best_share_hash'] is not None:
410                 timestamp2 = math.median((s.timestamp for s in itertools.islice(tracker.get_chain_to_root(state['best_share_hash']), 11)), use_float=False) + 1
411                 if timestamp2 > timestamp:
412                     print 'Toff', timestamp2 - timestamp
413                     timestamp = timestamp2
414             target2 = p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
415             times[p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['nonce']] = time.time()
416             #print 'SENT', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
417             return bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'], target2)
418         
419         my_shares = set()
420         times = {}
421         
422         def got_response(data, user):
423             try:
424                 # match up with transactions
425                 header = bitcoin.getwork.decode_data(data)
426                 transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
427                 if transactions is None:
428                     print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
429                     return False
430                 block = dict(header=header, txs=transactions)
431                 hash_ = bitcoin.data.block_header_type.hash256(block['header'])
432                 if hash_ <= block['header']['target'] or p2pool_init.DEBUG:
433                     if factory.conn.value is not None:
434                         factory.conn.value.send_block(block=block)
435                     else:
436                         print 'No bitcoind connection! Erp!'
437                     if hash_ <= block['header']['target']:
438                         print
439                         print 'GOT BLOCK! Passing to bitcoind! bitcoin: %x' % (hash_,)
440                         print
441                 target = p2pool.coinbase_type.unpack(transactions[0]['tx_ins'][0]['script'])['share_data']['target']
442                 if hash_ > target:
443                     print 'Worker submitted share with hash > target:\nhash  : %x\ntarget: %x' % (hash_, target)
444                     return False
445                 share = p2pool.Share.from_block(block)
446                 my_shares.add(share.hash)
447                 print 'GOT SHARE! %s %s prev %s age %.2fs' % (user, p2pool.format_hash(share.hash), p2pool.format_hash(share.previous_hash), time.time() - times[share.nonce]) + (' DEAD ON ARRIVAL' if share.previous_hash != current_work.value['best_share_hash'] else '')
448                 good = share.previous_hash == current_work.value['best_share_hash']
449                 # maybe revert back to tracker being non-blocking so 'good' can be more accurate?
450                 p2p_shares([share])
451                 # eg. good = share.hash == current_work.value['best_share_hash'] here
452                 return good
453             except:
454                 log.err(None, 'Error processing data received from worker:')
455                 return False
456         
457         web_root = worker_interface.WorkerInterface(current_work, compute, got_response, args.net)
458         
459         def get_rate():
460             if current_work.value['best_share_hash'] is not None:
461                 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
462                 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net, min(height, 720))
463                 return json.dumps(att_s)
464             return json.dumps(None)
465         
466         def get_users():
467             height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
468             weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 2**256)
469             res = {}
470             for script in sorted(weights, key=lambda s: weights[s]):
471                 res[bitcoin.data.script2_to_human(script, args.net)] = weights[script]/total_weight
472             return json.dumps(res)
473         
474         class WebInterface(resource.Resource):
475             def __init__(self, func, mime_type):
476                 self.func, self.mime_type = func, mime_type
477             
478             def render_GET(self, request):
479                 request.setHeader('Content-Type', self.mime_type)
480                 return self.func()
481         
482         web_root.putChild('rate', WebInterface(get_rate, 'application/json'))
483         web_root.putChild('users', WebInterface(get_users, 'application/json'))
484         if args.charts:
485             web_root.putChild('chain_img', WebInterface(lambda: draw.get(tracker, current_work.value['best_share_hash']), 'image/png'))
486         
487         reactor.listenTCP(args.worker_port, server.Site(web_root))
488         
489         print '    ...success!'
490         print
491         
492         # done!
493         
494         tx_pool = expiring_dict.ExpiringDict(600, get_touches=False) # hash -> tx
495         get_raw_transaction = deferral.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
496         
497         class Tx(object):
498             def __init__(self, tx, seen_at_block):
499                 self.hash = bitcoin.data.tx_type.hash256(tx)
500                 self.tx = tx
501                 self.seen_at_block = seen_at_block
502                 self.mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
503                 #print
504                 #print '%x %r' % (seen_at_block, tx)
505                 #for mention in self.mentions:
506                 #    print '%x' % mention
507                 #print
508                 self.parents_all_in_blocks = False
509                 self.value_in = 0
510                 #print self.tx
511                 self.value_out = sum(txout['value'] for txout in self.tx['tx_outs'])
512                 self._find_parents_in_blocks()
513             
514             @defer.inlineCallbacks
515             def _find_parents_in_blocks(self):
516                 for tx_in in self.tx['tx_ins']:
517                     try:
518                         raw_transaction = yield get_raw_transaction(tx_in['previous_output']['hash'])
519                     except Exception:
520                         return
521                     self.value_in += raw_transaction['tx']['txouts'][tx_in['previous_output']['index']]['value']
522                     #print raw_transaction
523                     if not raw_transaction['parent_blocks']:
524                         return
525                 self.parents_all_in_blocks = True
526             
527             def is_good(self):
528                 if not self.parents_all_in_blocks:
529                     return False
530                 x = self.is_good2()
531                 #print 'is_good:', x
532                 return x
533         
534         @defer.inlineCallbacks
535         def new_tx(tx_hash):
536             try:
537                 assert isinstance(tx_hash, (int, long))
538                 #print 'REQUESTING', tx_hash
539                 tx = yield (yield factory.getProtocol()).get_tx(tx_hash)
540                 #print 'GOT', tx
541                 tx_pool[bitcoin.data.tx_type.hash256(tx)] = Tx(tx, current_work.value['previous_block'])
542             except:
543                 log.err(None, 'Error handling tx:')
544         # disable for now, for testing impact on stales
545         #factory.new_tx.watch(new_tx)
546         
547         def new_block(block_hash):
548             work_updated.happened()
549         factory.new_block.watch(new_block)
550         
551         print 'Started successfully!'
552         print
553         
554         ht.updated.watch(set_real_work2)
555         
556         @defer.inlineCallbacks
557         def work1_thread():
558             while True:
559                 flag = work_updated.get_deferred()
560                 try:
561                     yield set_real_work1()
562                 except:
563                     log.err()
564                 yield defer.DeferredList([flag, deferral.sleep(random.uniform(1, 10))], fireOnOneCallback=True)
565         
566         @defer.inlineCallbacks
567         def work2_thread():
568             while True:
569                 try:
570                     set_real_work2()
571                 except:
572                     log.err()
573                 yield deferral.sleep(random.expovariate(1/20))
574         
575         work1_thread()
576         work2_thread()
577         
578         
579         if hasattr(signal, 'SIGALRM'):
580             def watchdog_handler(signum, frame):
581                 print 'Watchdog timer went off at:'
582                 traceback.print_stack()
583             
584             signal.signal(signal.SIGALRM, watchdog_handler)
585             task.LoopingCall(signal.alarm, 30).start(1)
586         
587         
588         def read_stale_frac(share):
589             if len(share.nonce) != 20:
590                 return None
591             a, b = struct.unpack("<HH", share.nonce[-4:])
592             if a != b:
593                 return None
594             return a/65535
595         
596         while True:
597             yield deferral.sleep(3)
598             try:
599                 if time.time() > current_work2.value['last_update'] + 60:
600                     print '''LOST CONTACT WITH BITCOIND for 60 seconds, check that it isn't frozen or dead'''
601                 if current_work.value['best_share_hash'] is not None:
602                     height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
603                     if height > 2:
604                         att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net, min(height - 1, 120))
605                         weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 120), 2**100)
606                         shares, stale_shares = get_share_counts()
607                         print 'Pool: %sH/s in %i shares (%i/%i verified) Recent: %.02f%% >%sH/s Shares: %i (%i stale) Peers: %i' % (
608                             math.format(att_s),
609                             height,
610                             len(tracker.verified.shares),
611                             len(tracker.shares),
612                             weights.get(my_script, 0)/total_weight*100,
613                             math.format(weights.get(my_script, 0)/total_weight*att_s),
614                             shares,
615                             stale_shares,
616                             len(p2p_node.peers),
617                         ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool_init.DEBUG else '')
618                         fracs = [read_stale_frac(share) for share in itertools.islice(tracker.get_chain_known(current_work.value['best_share_hash']), 120) if read_stale_frac(share) is not None]
619                         if fracs:
620                             med = math.median(fracs)
621                             print 'Median stale proportion:', med
622                             if shares:
623                                 print '    Own:', stale_shares/shares
624                                 if med < .99:
625                                     print '    Own efficiency: %.02f%%' % (100*(1 - stale_shares/shares)/(1 - med),)
626                             
627                             
628             except:
629                 log.err()
630     except:
631         log.err(None, 'Fatal error:')
632     finally:
633         reactor.stop()
634
635 def run():
636     class FixedArgumentParser(argparse.ArgumentParser):
637         def _read_args_from_files(self, arg_strings):
638             # expand arguments referencing files
639             new_arg_strings = []
640             for arg_string in arg_strings:
641                 
642                 # for regular arguments, just add them back into the list
643                 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
644                     new_arg_strings.append(arg_string)
645                 
646                 # replace arguments referencing files with the file content
647                 else:
648                     try:
649                         args_file = open(arg_string[1:])
650                         try:
651                             arg_strings = []
652                             for arg_line in args_file.read().splitlines():
653                                 for arg in self.convert_arg_line_to_args(arg_line):
654                                     arg_strings.append(arg)
655                             arg_strings = self._read_args_from_files(arg_strings)
656                             new_arg_strings.extend(arg_strings)
657                         finally:
658                             args_file.close()
659                     except IOError:
660                         err = sys.exc_info()[1]
661                         self.error(str(err))
662             
663             # return the modified argument list
664             return new_arg_strings
665     
666     parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool_init.__version__,), fromfile_prefix_chars='@')
667     parser.convert_arg_line_to_args = lambda arg_line: (arg for arg in arg_line.split() if arg.strip())
668     parser.add_argument('--version', action='version', version=p2pool_init.__version__)
669     parser.add_argument('--net',
670         help='use specified network (default: bitcoin)',
671         action='store', choices=sorted(x for x in p2pool.nets if 'testnet' not in x), default='bitcoin', dest='net_name')
672     parser.add_argument('--testnet',
673         help='''use the network's testnet''',
674         action='store_const', const=True, default=False, dest='testnet')
675     parser.add_argument('--debug',
676         help='debugging mode',
677         action='store_const', const=True, default=False, dest='debug')
678     parser.add_argument('-a', '--address',
679         help='generate to this address (defaults to requesting one from bitcoind)',
680         type=str, action='store', default=None, dest='address')
681     parser.add_argument('--charts',
682         help='generate charts on the web interface (requires PIL and pygame)',
683         action='store_const', const=True, default=False, dest='charts')
684     parser.add_argument('--logfile',
685         help='''log to specific file (defaults to <network_name>.log in run_p2pool.py's directory)''',
686         type=str, action='store', default=None, dest='logfile')
687     
688     p2pool_group = parser.add_argument_group('p2pool interface')
689     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
690         help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
691         type=int, action='store', default=None, dest='p2pool_port')
692     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
693         help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
694         type=str, action='append', default=[], dest='p2pool_nodes')
695     parser.add_argument('-l', '--low-bandwidth',
696         help='trade lower bandwidth usage for higher latency (reduced efficiency)',
697         action='store_true', default=False, dest='low_bandwidth')
698     parser.add_argument('--disable-upnp',
699         help='''don't attempt to forward port 9333 (19333 for testnet) from the WAN to this computer using UPnP''',
700         action='store_false', default=True, dest='upnp')
701     
702     worker_group = parser.add_argument_group('worker interface')
703     worker_group.add_argument('-w', '--worker-port', metavar='PORT',
704         help='listen on PORT for RPC connections from miners asking for work and providing responses (default: bitcoin: 9332 namecoin: 9331 ixcoin: 9330 i0coin: 9329, +10000 for testnets)',
705         type=int, action='store', default=None, dest='worker_port')
706     
707     bitcoind_group = parser.add_argument_group('bitcoind interface')
708     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
709         help='connect to a bitcoind at this address (default: 127.0.0.1)',
710         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
711     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
712         help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332, 8338 for ixcoin)',
713         type=int, action='store', default=None, dest='bitcoind_rpc_port')
714     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
715         help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 normally. 18333 for testnet)',
716         type=int, action='store', default=None, dest='bitcoind_p2p_port')
717     
718     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSER',
719         help='bitcoind RPC interface username (default: empty)',
720         type=str, action='store', default='', nargs='?', dest='bitcoind_rpc_username')
721     bitcoind_group.add_argument(metavar='BITCOIND_RPCPASSWORD',
722         help='bitcoind RPC interface password',
723         type=str, action='store', dest='bitcoind_rpc_password')
724     
725     args = parser.parse_args()
726     
727     if args.debug:
728         p2pool_init.DEBUG = True
729     
730     if args.logfile is None:
731        args.logfile = os.path.join(os.path.dirname(sys.argv[0]), args.net_name + ('_testnet' if args.testnet else '') + '.log')
732     
733     class ReopeningFile(object):
734         def __init__(self, *open_args, **open_kwargs):
735             self.open_args, self.open_kwargs = open_args, open_kwargs
736             self.inner_file = open(*self.open_args, **self.open_kwargs)
737         def reopen(self):
738             self.inner_file.close()
739             self.inner_file = open(*self.open_args, **self.open_kwargs)
740         def write(self, data):
741             self.inner_file.write(data)
742         def flush(self):
743             self.inner_file.flush()
744     class TeePipe(object):
745         def __init__(self, outputs):
746             self.outputs = outputs
747         def write(self, data):
748             for output in self.outputs:
749                 output.write(data)
750         def flush(self):
751             for output in self.outputs:
752                 output.flush()
753     class TimestampingPipe(object):
754         def __init__(self, inner_file):
755             self.inner_file = inner_file
756             self.buf = ''
757             self.softspace = 0
758         def write(self, data):
759             buf = self.buf + data
760             lines = buf.split('\n')
761             for line in lines[:-1]:
762                 self.inner_file.write('%s %s\n' % (datetime.datetime.now().strftime("%H:%M:%S.%f"), line))
763                 self.inner_file.flush()
764             self.buf = lines[-1]
765         def flush(self):
766             pass
767     logfile = ReopeningFile(args.logfile, 'w')
768     sys.stdout = sys.stderr = log.DefaultObserver.stderr = TimestampingPipe(TeePipe([sys.stderr, logfile]))
769     if hasattr(signal, "SIGUSR1"):
770         def sigusr1(signum, frame):
771             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
772             logfile.reopen()
773             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
774         signal.signal(signal.SIGUSR1, sigusr1)
775     
776     args.net = p2pool.nets[args.net_name + ('_testnet' if args.testnet else '')]
777     
778     if args.bitcoind_rpc_port is None:
779         args.bitcoind_rpc_port = args.net.BITCOIN_RPC_PORT
780     
781     if args.bitcoind_p2p_port is None:
782         args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
783     
784     if args.p2pool_port is None:
785         args.p2pool_port = args.net.P2P_PORT
786     
787     if args.worker_port is None:
788         args.worker_port = args.net.WORKER_PORT
789     
790     if args.address is not None:
791         try:
792             args.pubkey_hash = bitcoin.data.address_to_pubkey_hash(args.address, args.net)
793         except Exception, e:
794             raise ValueError('error parsing address: ' + repr(e))
795     else:
796         args.pubkey_hash = None
797     
798     reactor.callWhenRunning(main, args)
799     reactor.run()