added support for luke-jr's "Bugfix: getmemorypool 'bits' should be a hex-string"
[p2pool.git] / p2pool / main.py
1 #!/usr/bin/python
2
3 from __future__ import division
4
5 import argparse
6 import datetime
7 import itertools
8 import os
9 import random
10 import sqlite3
11 import struct
12 import sys
13 import time
14 import json
15 import signal
16 import traceback
17
18 from twisted.internet import defer, reactor, task
19 from twisted.web import server, resource
20 from twisted.python import log
21 from nattraverso import portmapper, ipdiscover
22
23 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
24 from util import db, expiring_dict, jsonrpc, variable, deferral, math
25 from . import p2p, worker_interface, skiplists
26 import p2pool.data as p2pool
27 import p2pool as p2pool_init
28
29 @deferral.retry('Error getting work from bitcoind:', 3)
30 @defer.inlineCallbacks
31 def getwork(bitcoind, ht, net):
32     try:
33         work = yield bitcoind.rpc_getmemorypool()
34         defer.returnValue(dict(
35             version=work['version'],
36             previous_block_hash=int(work['previousblockhash'], 16),
37             transactions=[bitcoin.data.tx_type.unpack(x.decode('hex')) for x in work['transactions']],
38             subsidy=work['coinbasevalue'],
39             time=work['time'],
40             target=bitcoin.data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin.data.FloatingInteger(work['bits']),
41         ))
42     except jsonrpc.Error, e:
43         if e.code != -32601:
44             raise
45         
46         print "---> Update your bitcoind to support the 'getmemorypool' RPC call. Not including transactions in generated blocks! <---"
47         work = bitcoin.getwork.BlockAttempt.from_getwork((yield bitcoind.rpc_getwork()))
48         try:
49             subsidy = net.BITCOIN_SUBSIDY_FUNC(ht.getHeight(work.previous_block))
50         except ValueError:
51             subsidy = net.BITCOIN_SUBSIDY_FUNC(1000)
52         
53         defer.returnValue(dict(
54             version=work.version,
55             previous_block_hash=work.previous_block,
56             transactions=[],
57             subsidy=subsidy,
58             time=work.timestamp,
59             target=work.block_target,
60         ))
61
62 @deferral.retry('Error getting payout script from bitcoind:', 1)
63 @defer.inlineCallbacks
64 def get_payout_script(factory):
65     res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
66     if res['reply'] == 'success':
67         defer.returnValue(res['script'])
68     elif res['reply'] == 'denied':
69         defer.returnValue(None)
70     else:
71         raise ValueError('Unexpected reply: %r' % (res,))
72
73 @deferral.retry('Error creating payout script:', 10)
74 @defer.inlineCallbacks
75 def get_payout_script2(bitcoind, net):
76     defer.returnValue(bitcoin.data.pubkey_hash_to_script2(bitcoin.data.address_to_pubkey_hash((yield bitcoind.rpc_getaccountaddress('p2pool')), net)))
77
78 @defer.inlineCallbacks
79 def main(args):
80     try:
81         print 'p2pool (version %s)' % (p2pool_init.__version__,)
82         print
83         try:
84             from . import draw
85         except ImportError:
86             draw = None
87             print "Install Pygame and PIL to enable visualizations! Visualizations disabled."
88             print
89         
90         # connect to bitcoind over JSON-RPC and do initial getwork
91         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
92         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
93         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
94         good = yield deferral.retry('Error while checking bitcoind identity:', 1)(args.net.BITCOIN_RPC_CHECK)(bitcoind)
95         if not good:
96             print "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
97             return
98         temp_work = yield deferral.retry('Error while testing getwork:', 1)(defer.inlineCallbacks(lambda: defer.returnValue(bitcoin.getwork.BlockAttempt.from_getwork((yield bitcoind.rpc_getwork())))))()
99         print '    ...success!'
100         print '    Current block hash: %x' % (temp_work.previous_block,)
101         print
102         
103         # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
104         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
105         factory = bitcoin.p2p.ClientFactory(args.net)
106         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
107         my_script = yield get_payout_script(factory)
108         if args.pubkey_hash is None:
109             if my_script is None:
110                 print '    IP transaction denied ... falling back to sending to address.'
111                 my_script = yield get_payout_script2(bitcoind, args.net)
112         else:
113             my_script = bitcoin.data.pubkey_hash_to_script2(args.pubkey_hash)
114         print '    ...success!'
115         print '    Payout script:', bitcoin.data.script2_to_human(my_script, args.net)
116         print
117         
118         print 'Loading cached block headers...'
119         ht = bitcoin.p2p.HeightTracker(factory, args.net.NAME + '_headers.dat')
120         print '   ...done loading %i cached block headers.' % (len(ht.tracker.shares),)
121         print
122         
123         tracker = p2pool.OkayTracker(args.net)
124         ss = p2pool.ShareStore(os.path.join(os.path.dirname(sys.argv[0]), args.net.NAME + '_shares.'), args.net)
125         known_verified = set()
126         print "Loading shares..."
127         for i, (mode, contents) in enumerate(ss.get_shares()):
128             if mode == 'share':
129                 if contents.hash in tracker.shares:
130                     continue
131                 contents.shared = True
132                 contents.stored = True
133                 contents.time_seen = 0
134                 tracker.add(contents)
135                 if len(tracker.shares) % 1000 == 0 and tracker.shares:
136                     print "    %i" % (len(tracker.shares),)
137             elif mode == 'verified_hash':
138                 known_verified.add(contents)
139             else:
140                 raise AssertionError()
141         print "    ...inserting %i verified shares..." % (len(known_verified),)
142         for h in known_verified:
143             if h not in tracker.shares:
144                 ss.forget_verified_share(h)
145                 continue
146             tracker.verified.add(tracker.shares[h])
147         print "    ...done loading %i shares!" % (len(tracker.shares),)
148         print
149         tracker.added.watch(lambda share: ss.add_share(share))
150         tracker.verified.added.watch(lambda share: ss.add_verified_hash(share.hash))
151         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
152         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
153         
154         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
155         
156         # information affecting work that should trigger a long-polling update
157         current_work = variable.Variable(None)
158         # information affecting work that should not trigger a long-polling update
159         current_work2 = variable.Variable(None)
160         
161         work_updated = variable.Event()
162         
163         requested = expiring_dict.ExpiringDict(300)
164         
165         @defer.inlineCallbacks
166         def set_real_work1():
167             work = yield getwork(bitcoind, ht, args.net)
168             changed = work['previous_block_hash'] != current_work.value['previous_block'] if current_work.value is not None else True
169             current_work.set(dict(
170                 version=work['version'],
171                 previous_block=work['previous_block_hash'],
172                 target=work['target'],
173                 best_share_hash=current_work.value['best_share_hash'] if current_work.value is not None else None,
174             ))
175             current_work2.set(dict(
176                 transactions=work['transactions'],
177                 subsidy=work['subsidy'],
178                 clock_offset=time.time() - work['time'],
179                 last_update=time.time(),
180             ))
181             if changed:
182                 set_real_work2()
183         
184         def set_real_work2():
185             best, desired = tracker.think(ht, current_work.value['previous_block'], time.time() - current_work2.value['clock_offset'])
186             
187             t = dict(current_work.value)
188             t['best_share_hash'] = best
189             current_work.set(t)
190             
191             t = time.time()
192             for peer2, share_hash in desired:
193                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
194                     continue
195                 last_request_time, count = requested.get(share_hash, (None, 0))
196                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
197                     continue
198                 potential_peers = set()
199                 for head in tracker.tails[share_hash]:
200                     potential_peers.update(peer_heads.get(head, set()))
201                 potential_peers = [peer for peer in potential_peers if peer.connected2]
202                 if count == 0 and peer2 is not None and peer2.connected2:
203                     peer = peer2
204                 else:
205                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
206                     if peer is None:
207                         continue
208                 
209                 print 'Requesting parent share %s from %s' % (p2pool.format_hash(share_hash), '%s:%i' % peer.addr)
210                 peer.send_getshares(
211                     hashes=[share_hash],
212                     parents=2000,
213                     stops=list(set(tracker.heads) | set(
214                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
215                     ))[:100],
216                 )
217                 requested[share_hash] = t, count + 1
218         
219         print 'Initializing work...'
220         yield set_real_work1()
221         set_real_work2()
222         print '    ...success!'
223         print
224         
225         start_time = time.time() - current_work2.value['clock_offset']
226         
227         # setup p2p logic and join p2pool network
228         
229         def share_share(share, ignore_peer=None):
230             for peer in p2p_node.peers.itervalues():
231                 if peer is ignore_peer:
232                     continue
233                 #if p2pool_init.DEBUG:
234                 #    print "Sending share %s to %r" % (p2pool.format_hash(share.hash), peer.addr)
235                 peer.send_shares([share])
236             share.flag_shared()
237         
238         def p2p_shares(shares, peer=None):
239             if len(shares) > 5:
240                 print 'Processing %i shares...' % (len(shares),)
241             
242             new_count = 0
243             for share in shares:
244                 if share.hash in tracker.shares:
245                     #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool.format_hash(share.hash),)
246                     continue
247                 
248                 new_count += 1
249                 
250                 #print 'Received share %s from %r' % (p2pool.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
251                 
252                 tracker.add(share)
253             
254             if shares and peer is not None:
255                 peer_heads.setdefault(shares[0].hash, set()).add(peer)
256             
257             if new_count:
258                 set_real_work2()
259             
260             if len(shares) > 5:
261                 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*args.net.CHAIN_LENGTH)
262         
263         @tracker.verified.added.watch
264         def _(share):
265             if share.bitcoin_hash <= share.header['target']:
266                 print
267                 print 'GOT BLOCK! Passing to bitcoind! %s bitcoin: %x' % (p2pool.format_hash(share.hash), share.bitcoin_hash,)
268                 print
269                 if factory.conn.value is not None:
270                     factory.conn.value.send_block(block=share.as_block(tracker, args.net))
271                 else:
272                     print 'No bitcoind connection! Erp!'
273         
274         def p2p_share_hashes(share_hashes, peer):
275             t = time.time()
276             get_hashes = []
277             for share_hash in share_hashes:
278                 if share_hash in tracker.shares:
279                     continue
280                 last_request_time, count = requested.get(share_hash, (None, 0))
281                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
282                     continue
283                 print 'Got share hash, requesting! Hash: %s' % (p2pool.format_hash(share_hash),)
284                 get_hashes.append(share_hash)
285                 requested[share_hash] = t, count + 1
286             
287             if share_hashes and peer is not None:
288                 peer_heads.setdefault(share_hashes[0], set()).add(peer)
289             if get_hashes:
290                 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
291         
292         def p2p_get_shares(share_hashes, parents, stops, peer):
293             parents = min(parents, 1000//len(share_hashes))
294             stops = set(stops)
295             shares = []
296             for share_hash in share_hashes:
297                 for share in itertools.islice(tracker.get_chain_known(share_hash), parents + 1):
298                     if share.hash in stops:
299                         break
300                     shares.append(share)
301             print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
302             peer.send_shares(shares, full=True)
303         
304         print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
305         
306         def parse(x):
307             if ':' in x:
308                 ip, port = x.split(':')
309                 return ip, int(port)
310             else:
311                 return x, args.net.P2P_PORT
312         
313         nodes = set([
314             ('72.14.191.28', args.net.P2P_PORT),
315             ('62.204.197.159', args.net.P2P_PORT),
316             ('142.58.248.28', args.net.P2P_PORT),
317             ('94.23.34.145', args.net.P2P_PORT),
318         ])
319         for host in [
320             'p2pool.forre.st',
321             'dabuttonfactory.com',
322         ]:
323             try:
324                 nodes.add(((yield reactor.resolve(host)), args.net.P2P_PORT))
325             except:
326                 log.err(None, 'Error resolving bootstrap node IP:')
327         
328         p2p_node = p2p.Node(
329             current_work=current_work,
330             port=args.p2pool_port,
331             net=args.net,
332             addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.NAME),
333             mode=0 if args.low_bandwidth else 1,
334             preferred_addrs=set(map(parse, args.p2pool_nodes)) | nodes,
335         )
336         p2p_node.handle_shares = p2p_shares
337         p2p_node.handle_share_hashes = p2p_share_hashes
338         p2p_node.handle_get_shares = p2p_get_shares
339         
340         p2p_node.start()
341         
342         # send share when the chain changes to their chain
343         def work_changed(new_work):
344             #print 'Work changed:', new_work
345             for share in tracker.get_chain_known(new_work['best_share_hash']):
346                 if share.shared:
347                     break
348                 share_share(share, share.peer)
349         current_work.changed.watch(work_changed)
350         
351         print '    ...success!'
352         print
353         
354         @defer.inlineCallbacks
355         def upnp_thread():
356             while True:
357                 try:
358                     is_lan, lan_ip = yield ipdiscover.get_local_ip()
359                     if is_lan:
360                         pm = yield portmapper.get_port_mapper()
361                         yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP') # XXX try to forward external correct port?
362                 except defer.TimeoutError:
363                     pass
364                 except:
365                     if p2pool_init.DEBUG:
366                         log.err(None, "UPnP error:")
367                 yield deferral.sleep(random.expovariate(1/120))
368         
369         if args.upnp:
370             upnp_thread()
371         
372         # start listening for workers with a JSON-RPC server
373         
374         print 'Listening for workers on port %i...' % (args.worker_port,)
375         
376         # setup worker logic
377         
378         merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
379         run_identifier = struct.pack('<Q', random.randrange(2**64))
380         
381         share_counter = skiplists.CountsSkipList(tracker, run_identifier)
382         removed_unstales = set()
383         def get_share_counts(doa=False):
384             height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
385             matching_in_chain = share_counter(current_work.value['best_share_hash'], max(0, height - 1)) | removed_unstales
386             shares_in_chain = my_shares & matching_in_chain
387             stale_shares = my_shares - matching_in_chain
388             if doa:
389                 stale_doa_shares = stale_shares & doa_shares
390                 stale_not_doa_shares = stale_shares - stale_doa_shares
391                 return len(shares_in_chain) + len(stale_shares), len(stale_doa_shares), len(stale_not_doa_shares)
392             return len(shares_in_chain) + len(stale_shares), len(stale_shares)
393         @tracker.verified.removed.watch
394         def _(share):
395             if share.hash in my_shares and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
396                 removed_unstales.add(share.hash)
397         
398         def compute(state, payout_script):
399             if payout_script is None or random.uniform(0, 100) < args.worker_fee:
400                 payout_script = my_script
401             if state['best_share_hash'] is None and args.net.PERSIST:
402                 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
403             if len(p2p_node.peers) == 0 and args.net.PERSIST:
404                 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
405             if time.time() > current_work2.value['last_update'] + 60:
406                 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
407             
408             # XXX assuming generate_tx is smallish here..
409             def get_stale_frac():
410                 shares, stale_shares = get_share_counts()
411                 if shares == 0:
412                     return ""
413                 frac = stale_shares/shares
414                 return 2*struct.pack('<H', int(65535*frac + .5))
415             subsidy = current_work2.value['subsidy']
416             generate_tx = p2pool.generate_transaction(
417                 tracker=tracker,
418                 previous_share_hash=state['best_share_hash'],
419                 new_script=payout_script,
420                 subsidy=subsidy,
421                 nonce=run_identifier + struct.pack('<Q', random.randrange(2**64)) + get_stale_frac(),
422                 block_target=state['target'],
423                 net=args.net,
424             )
425             print 'New work for worker! Difficulty: %.06f Payout if block: %.6f %s Total block value: %.6f %s including %i transactions' % (0xffff*2**208/p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'], (generate_tx['tx_outs'][-1]['value']-subsidy//200)*1e-8, args.net.BITCOIN_SYMBOL, subsidy*1e-8, args.net.BITCOIN_SYMBOL, len(current_work2.value['transactions']))
426             #print 'Target: %x' % (p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'],)
427             #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
428             transactions = [generate_tx] + list(current_work2.value['transactions'])
429             merkle_root = bitcoin.data.merkle_hash(transactions)
430             merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
431             
432             timestamp = int(time.time() - current_work2.value['clock_offset'])
433             if state['best_share_hash'] is not None:
434                 timestamp2 = math.median((s.timestamp for s in itertools.islice(tracker.get_chain_to_root(state['best_share_hash']), 11)), use_float=False) + 1
435                 if timestamp2 > timestamp:
436                     print 'Toff', timestamp2 - timestamp
437                     timestamp = timestamp2
438             target2 = p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
439             times[p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['nonce']] = time.time()
440             #print 'SENT', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
441             return bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'], target2)
442         
443         my_shares = set()
444         doa_shares = set()
445         times = {}
446         
447         def got_response(data, user):
448             try:
449                 # match up with transactions
450                 header = bitcoin.getwork.decode_data(data)
451                 transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
452                 if transactions is None:
453                     print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
454                     return False
455                 block = dict(header=header, txs=transactions)
456                 hash_ = bitcoin.data.block_header_type.hash256(block['header'])
457                 if hash_ <= block['header']['target'] or p2pool_init.DEBUG:
458                     if factory.conn.value is not None:
459                         factory.conn.value.send_block(block=block)
460                     else:
461                         print 'No bitcoind connection! Erp!'
462                     if hash_ <= block['header']['target']:
463                         print
464                         print 'GOT BLOCK! Passing to bitcoind! bitcoin: %x' % (hash_,)
465                         print
466                 target = p2pool.coinbase_type.unpack(transactions[0]['tx_ins'][0]['script'])['share_data']['target']
467                 if hash_ > target:
468                     print 'Worker submitted share with hash > target:\nhash  : %x\ntarget: %x' % (hash_, target)
469                     return False
470                 share = p2pool.Share.from_block(block)
471                 my_shares.add(share.hash)
472                 if share.previous_hash != current_work.value['best_share_hash']:
473                     doa_shares.add(share.hash)
474                 print 'GOT SHARE! %s %s prev %s age %.2fs' % (user, p2pool.format_hash(share.hash), p2pool.format_hash(share.previous_hash), time.time() - times[share.nonce]) + (' DEAD ON ARRIVAL' if share.previous_hash != current_work.value['best_share_hash'] else '')
475                 good = share.previous_hash == current_work.value['best_share_hash']
476                 # maybe revert back to tracker being non-blocking so 'good' can be more accurate?
477                 p2p_shares([share])
478                 # eg. good = share.hash == current_work.value['best_share_hash'] here
479                 return good
480             except:
481                 log.err(None, 'Error processing data received from worker:')
482                 return False
483         
484         web_root = worker_interface.WorkerInterface(current_work, compute, got_response, args.net)
485         
486         def get_rate():
487             if current_work.value['best_share_hash'] is not None:
488                 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
489                 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net, min(height - 1, 720))
490                 fracs = [read_stale_frac(share) for share in itertools.islice(tracker.get_chain_known(current_work.value['best_share_hash']), 120) if read_stale_frac(share) is not None]
491                 return json.dumps(int(att_s / (1. - (math.median(fracs) if fracs else 0))))
492             return json.dumps(None)
493         
494         def get_users():
495             height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
496             weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 2**256)
497             res = {}
498             for script in sorted(weights, key=lambda s: weights[s]):
499                 res[bitcoin.data.script2_to_human(script, args.net)] = weights[script]/total_weight
500             return json.dumps(res)
501         
502         class WebInterface(resource.Resource):
503             def __init__(self, func, mime_type):
504                 self.func, self.mime_type = func, mime_type
505             
506             def render_GET(self, request):
507                 request.setHeader('Content-Type', self.mime_type)
508                 return self.func()
509         
510         web_root.putChild('rate', WebInterface(get_rate, 'application/json'))
511         web_root.putChild('users', WebInterface(get_users, 'application/json'))
512         web_root.putChild('fee', WebInterface(lambda: json.dumps(arg.worker_fee), 'application/json'))
513         if draw is not None:
514             web_root.putChild('chain_img', WebInterface(lambda: draw.get(tracker, current_work.value['best_share_hash']), 'image/png'))
515         
516         reactor.listenTCP(args.worker_port, server.Site(web_root))
517         
518         print '    ...success!'
519         print
520         
521         # done!
522         
523         # do new getwork when a block is heard on the p2p interface
524         
525         def new_block(block_hash):
526             work_updated.happened()
527         factory.new_block.watch(new_block)
528         
529         print 'Started successfully!'
530         print
531         
532         ht.updated.watch(set_real_work2)
533         
534         @defer.inlineCallbacks
535         def work1_thread():
536             while True:
537                 flag = work_updated.get_deferred()
538                 try:
539                     yield set_real_work1()
540                 except:
541                     log.err()
542                 yield defer.DeferredList([flag, deferral.sleep(random.uniform(1, 10))], fireOnOneCallback=True)
543         
544         @defer.inlineCallbacks
545         def work2_thread():
546             while True:
547                 try:
548                     set_real_work2()
549                 except:
550                     log.err()
551                 yield deferral.sleep(random.expovariate(1/20))
552         
553         work1_thread()
554         work2_thread()
555         
556         
557         if hasattr(signal, 'SIGALRM'):
558             def watchdog_handler(signum, frame):
559                 print 'Watchdog timer went off at:'
560                 traceback.print_stack()
561             
562             signal.signal(signal.SIGALRM, watchdog_handler)
563             task.LoopingCall(signal.alarm, 30).start(1)
564         
565         
566         def read_stale_frac(share):
567             if len(share.nonce) != 20:
568                 return None
569             a, b = struct.unpack("<HH", share.nonce[-4:])
570             if a != b:
571                 return None
572             return a/65535
573         
574         while True:
575             yield deferral.sleep(3)
576             try:
577                 if time.time() > current_work2.value['last_update'] + 60:
578                     print '''---> LOST CONTACT WITH BITCOIND for 60 seconds, check that it isn't frozen or dead <---'''
579                 if current_work.value['best_share_hash'] is not None:
580                     height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
581                     if height > 2:
582                         att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net, min(height - 1, 720))
583                         weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 2**100)
584                         shares, stale_doa_shares, stale_not_doa_shares = get_share_counts(True)
585                         stale_shares = stale_doa_shares + stale_not_doa_shares
586                         fracs = [read_stale_frac(share) for share in itertools.islice(tracker.get_chain_known(current_work.value['best_share_hash']), 120) if read_stale_frac(share) is not None]
587                         print 'Pool: %sH/s in %i shares (%i/%i verified) Recent: %.02f%% >%sH/s Shares: %i (%i orphan, %i dead) Peers: %i' % (
588                             math.format(int(att_s / (1. - (math.median(fracs) if fracs else 0)))),
589                             height,
590                             len(tracker.verified.shares),
591                             len(tracker.shares),
592                             weights.get(my_script, 0)/total_weight*100,
593                             math.format(int(weights.get(my_script, 0)*att_s//total_weight / (1. - (math.median(fracs) if fracs else 0)))),
594                             shares,
595                             stale_not_doa_shares,
596                             stale_doa_shares,
597                             len(p2p_node.peers),
598                         ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool_init.DEBUG else '')
599                         if fracs:
600                             med = math.median(fracs)
601                             print 'Median stale proportion:', med
602                             if shares:
603                                 print '    Own:', stale_shares/shares
604                                 if med < .99:
605                                     print '    Own efficiency: %.02f%%' % (100*(1 - stale_shares/shares)/(1 - med),)
606             
607             
608             except:
609                 log.err()
610     except:
611         log.err(None, 'Fatal error:')
612     finally:
613         reactor.stop()
614
615 def run():
616     class FixedArgumentParser(argparse.ArgumentParser):
617         def _read_args_from_files(self, arg_strings):
618             # expand arguments referencing files
619             new_arg_strings = []
620             for arg_string in arg_strings:
621                 
622                 # for regular arguments, just add them back into the list
623                 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
624                     new_arg_strings.append(arg_string)
625                 
626                 # replace arguments referencing files with the file content
627                 else:
628                     try:
629                         args_file = open(arg_string[1:])
630                         try:
631                             arg_strings = []
632                             for arg_line in args_file.read().splitlines():
633                                 for arg in self.convert_arg_line_to_args(arg_line):
634                                     arg_strings.append(arg)
635                             arg_strings = self._read_args_from_files(arg_strings)
636                             new_arg_strings.extend(arg_strings)
637                         finally:
638                             args_file.close()
639                     except IOError:
640                         err = sys.exc_info()[1]
641                         self.error(str(err))
642             
643             # return the modified argument list
644             return new_arg_strings
645         
646         def convert_arg_line_to_args(self, arg_line):
647             return [arg for arg in arg_line.split() if arg.strip()]
648     
649     parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool_init.__version__,), fromfile_prefix_chars='@')
650     parser.add_argument('--version', action='version', version=p2pool_init.__version__)
651     parser.add_argument('--net',
652         help='use specified network (default: bitcoin)',
653         action='store', choices=sorted(x for x in p2pool.nets if 'testnet' not in x), default='bitcoin', dest='net_name')
654     parser.add_argument('--testnet',
655         help='''use the network's testnet''',
656         action='store_const', const=True, default=False, dest='testnet')
657     parser.add_argument('--debug',
658         help='debugging mode',
659         action='store_const', const=True, default=False, dest='debug')
660     parser.add_argument('-a', '--address',
661         help='generate to this address (defaults to requesting one from bitcoind)',
662         type=str, action='store', default=None, dest='address')
663     parser.add_argument('--logfile',
664         help='''log to specific file (defaults to <network_name>.log in run_p2pool.py's directory)''',
665         type=str, action='store', default=None, dest='logfile')
666     
667     p2pool_group = parser.add_argument_group('p2pool interface')
668     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
669         help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
670         type=int, action='store', default=None, dest='p2pool_port')
671     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
672         help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
673         type=str, action='append', default=[], dest='p2pool_nodes')
674     parser.add_argument('-l', '--low-bandwidth',
675         help='trade lower bandwidth usage for higher latency (reduced efficiency)',
676         action='store_true', default=False, dest='low_bandwidth')
677     parser.add_argument('--disable-upnp',
678         help='''don't attempt to forward port 9333 (19333 for testnet) from the WAN to this computer using UPnP''',
679         action='store_false', default=True, dest='upnp')
680     
681     worker_group = parser.add_argument_group('worker interface')
682     worker_group.add_argument('-w', '--worker-port', metavar='PORT',
683         help='listen on PORT for RPC connections from miners asking for work and providing responses (default: bitcoin: 9332 namecoin: 9331 ixcoin: 9330 i0coin: 9329, +10000 for testnets)',
684         type=int, action='store', default=None, dest='worker_port')
685     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
686         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:9332/fee . default: 0''',
687         type=float, action='store', default=0, dest='worker_fee')
688     
689     bitcoind_group = parser.add_argument_group('bitcoind interface')
690     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
691         help='connect to a bitcoind at this address (default: 127.0.0.1)',
692         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
693     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
694         help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332, 8338 for ixcoin)',
695         type=int, action='store', default=None, dest='bitcoind_rpc_port')
696     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
697         help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 normally. 18333 for testnet)',
698         type=int, action='store', default=None, dest='bitcoind_p2p_port')
699     
700     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSER',
701         help='bitcoind RPC interface username (default: empty)',
702         type=str, action='store', default='', nargs='?', dest='bitcoind_rpc_username')
703     bitcoind_group.add_argument(metavar='BITCOIND_RPCPASSWORD',
704         help='bitcoind RPC interface password',
705         type=str, action='store', dest='bitcoind_rpc_password')
706     
707     args = parser.parse_args()
708     
709     if args.debug:
710         p2pool_init.DEBUG = True
711     
712     if args.logfile is None:
713         args.logfile = os.path.join(os.path.dirname(sys.argv[0]), args.net_name + ('_testnet' if args.testnet else '') + '.log')
714     
715     class LogFile(object):
716         def __init__(self, filename):
717             self.filename = filename
718             self.inner_file = None
719             self.reopen()
720         def reopen(self):
721             if self.inner_file is not None:
722                 self.inner_file.close()
723             open(self.filename, 'a').close()
724             f = open(self.filename, 'rb')
725             f.seek(0, os.SEEK_END)
726             length = f.tell()
727             if length > 100*1000*1000:
728                 f.seek(-1000*1000, os.SEEK_END)
729                 while True:
730                     if f.read(1) in ('', '\n'):
731                         break
732                 data = f.read()
733                 f.close()
734                 f = open(self.filename, 'wb')
735                 f.write(data)
736             f.close()
737             self.inner_file = open(self.filename, 'a')
738         def write(self, data):
739             self.inner_file.write(data)
740         def flush(self):
741             self.inner_file.flush()
742     class TeePipe(object):
743         def __init__(self, outputs):
744             self.outputs = outputs
745         def write(self, data):
746             for output in self.outputs:
747                 output.write(data)
748         def flush(self):
749             for output in self.outputs:
750                 output.flush()
751     class TimestampingPipe(object):
752         def __init__(self, inner_file):
753             self.inner_file = inner_file
754             self.buf = ''
755             self.softspace = 0
756         def write(self, data):
757             buf = self.buf + data
758             lines = buf.split('\n')
759             for line in lines[:-1]:
760                 self.inner_file.write('%s %s\n' % (datetime.datetime.now().strftime("%H:%M:%S.%f"), line))
761                 self.inner_file.flush()
762             self.buf = lines[-1]
763         def flush(self):
764             pass
765     logfile = LogFile(args.logfile)
766     sys.stdout = sys.stderr = log.DefaultObserver.stderr = TimestampingPipe(TeePipe([sys.stderr, logfile]))
767     if hasattr(signal, "SIGUSR1"):
768         def sigusr1(signum, frame):
769             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
770             logfile.reopen()
771             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
772         signal.signal(signal.SIGUSR1, sigusr1)
773     task.LoopingCall(logfile.reopen).start(5)
774     
775     args.net = p2pool.nets[args.net_name + ('_testnet' if args.testnet else '')]
776     
777     if args.bitcoind_rpc_port is None:
778         args.bitcoind_rpc_port = args.net.BITCOIN_RPC_PORT
779     
780     if args.bitcoind_p2p_port is None:
781         args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
782     
783     if args.p2pool_port is None:
784         args.p2pool_port = args.net.P2P_PORT
785     
786     if args.worker_port is None:
787         args.worker_port = args.net.WORKER_PORT
788     
789     if args.address is not None:
790         try:
791             args.pubkey_hash = bitcoin.data.address_to_pubkey_hash(args.address, args.net)
792         except Exception, e:
793             raise ValueError('error parsing address: ' + repr(e))
794     else:
795         args.pubkey_hash = None
796     
797     reactor.callWhenRunning(main, args)
798     reactor.run()