reopen logfile every five seconds
[p2pool.git] / p2pool / main.py
1 #!/usr/bin/python
2
3 from __future__ import division
4
5 import argparse
6 import datetime
7 import itertools
8 import os
9 import random
10 import sqlite3
11 import struct
12 import sys
13 import time
14 import json
15 import signal
16 import traceback
17
18 from twisted.internet import defer, reactor, task, threads
19 from twisted.web import server, resource
20 from twisted.python import log
21 from nattraverso import portmapper, ipdiscover
22
23 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
24 from util import db, expiring_dict, jsonrpc, variable, deferral, math
25 from . import p2p, worker_interface, skiplists
26 import p2pool.data as p2pool
27 import p2pool as p2pool_init
28
29 @deferral.retry('Error getting work from bitcoind:', 3)
30 @defer.inlineCallbacks
31 def getwork(bitcoind, ht):
32     # a block could arrive in between these two queries
33     work = bitcoin.getwork.BlockAttempt.from_getwork((yield bitcoind.rpc_getwork()))
34     try:
35         height = ht.getHeight(work.previous_block)
36     except ValueError:
37         height = 1000 # XXX
38     defer.returnValue((work, height))
39
40 @deferral.retry('Error getting payout script from bitcoind:', 1)
41 @defer.inlineCallbacks
42 def get_payout_script(factory):
43     res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
44     if res['reply'] == 'success':
45         defer.returnValue(res['script'])
46     elif res['reply'] == 'denied':
47         defer.returnValue(None)
48     else:
49         raise ValueError('Unexpected reply: %r' % (res,))
50
51 @deferral.retry('Error creating payout script:', 10)
52 @defer.inlineCallbacks
53 def get_payout_script2(bitcoind, net):
54     defer.returnValue(bitcoin.data.pubkey_hash_to_script2(bitcoin.data.address_to_pubkey_hash((yield bitcoind.rpc_getaccountaddress('p2pool')), net)))
55
56 @defer.inlineCallbacks
57 def main(args):
58     try:
59         if args.charts:
60             from . import draw
61         
62         print 'p2pool (version %s)' % (p2pool_init.__version__,)
63         print
64         
65         # connect to bitcoind over JSON-RPC and do initial getwork
66         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
67         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
68         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
69         good = yield deferral.retry('Error while checking bitcoind identity:', 1)(args.net.BITCOIN_RPC_CHECK)(bitcoind)
70         if not good:
71             print "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
72             return
73         temp_work = yield deferral.retry('Error while testing getwork:', 1)(defer.inlineCallbacks(lambda: defer.returnValue(bitcoin.getwork.BlockAttempt.from_getwork((yield bitcoind.rpc_getwork())))))()
74         print '    ...success!'
75         print '    Current block hash: %x' % (temp_work.previous_block,)
76         print
77         
78         # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
79         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
80         factory = bitcoin.p2p.ClientFactory(args.net)
81         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
82         my_script = yield get_payout_script(factory)
83         if args.pubkey_hash is None:
84             if my_script is None:
85                 print '    IP transaction denied ... falling back to sending to address.'
86                 my_script = yield get_payout_script2(bitcoind, args.net)
87         else:
88             my_script = bitcoin.data.pubkey_hash_to_script2(args.pubkey_hash)
89         print '    ...success!'
90         print '    Payout script:', my_script.encode('hex')
91         print
92         
93         print 'Loading cached block headers...'
94         ht = bitcoin.p2p.HeightTracker(factory, args.net.NAME + '_headers.dat')
95         print '   ...done loading %i cached block headers.' % (len(ht.tracker.shares),)
96         print
97         
98         tracker = p2pool.OkayTracker(args.net)
99         ss = p2pool.ShareStore(os.path.join(os.path.dirname(sys.argv[0]), args.net.NAME + '_shares.'), args.net)
100         known_verified = set()
101         print "Loading shares..."
102         for i, (mode, contents) in enumerate(ss.get_shares()):
103             if mode == 'share':
104                 if contents.hash in tracker.shares:
105                     continue
106                 contents.shared = True
107                 contents.stored = True
108                 contents.time_seen = 0
109                 tracker.add(contents)
110                 if len(tracker.shares) % 1000 == 0 and tracker.shares:
111                     print "    %i" % (len(tracker.shares),)
112             elif mode == 'verified_hash':
113                 known_verified.add(contents)
114             else:
115                 raise AssertionError()
116         print "    ...inserting %i verified shares..." % (len(known_verified),)
117         for h in known_verified:
118             if h not in tracker.shares:
119                 ss.forget_verified_share(h)
120                 continue
121             tracker.verified.add(tracker.shares[h])
122         print "    ...done loading %i shares!" % (len(tracker.shares),)
123         print
124         tracker.added.watch(lambda share: ss.add_share(share))
125         tracker.verified.added.watch(lambda share: ss.add_verified_hash(share.hash))
126         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
127         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
128         
129         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
130         
131         # information affecting work that should trigger a long-polling update
132         current_work = variable.Variable(None)
133         # information affecting work that should not trigger a long-polling update
134         current_work2 = variable.Variable(None)
135         
136         work_updated = variable.Event()
137         
138         requested = expiring_dict.ExpiringDict(300)
139         
140         @defer.inlineCallbacks
141         def set_real_work1():
142             work, height = yield getwork(bitcoind, ht)
143             changed = work.previous_block != current_work.value['previous_block'] if current_work.value is not None else True
144             current_work.set(dict(
145                 version=work.version,
146                 previous_block=work.previous_block,
147                 target=work.target,
148                 height=height,
149                 best_share_hash=current_work.value['best_share_hash'] if current_work.value is not None else None,
150             ))
151             current_work2.set(dict(
152                 clock_offset=time.time() - work.timestamp,
153                 last_update=time.time(),
154             ))
155             if changed:
156                 set_real_work2()
157         
158         def set_real_work2():
159             best, desired = tracker.think(ht, current_work.value['previous_block'], time.time() - current_work2.value['clock_offset'])
160             
161             t = dict(current_work.value)
162             t['best_share_hash'] = best
163             current_work.set(t)
164             
165             t = time.time()
166             for peer2, share_hash in desired:
167                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
168                     continue
169                 last_request_time, count = requested.get(share_hash, (None, 0))
170                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
171                     continue
172                 potential_peers = set()
173                 for head in tracker.tails[share_hash]:
174                     potential_peers.update(peer_heads.get(head, set()))
175                 potential_peers = [peer for peer in potential_peers if peer.connected2]
176                 if count == 0 and peer2 is not None and peer2.connected2:
177                     peer = peer2
178                 else:
179                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
180                     if peer is None:
181                         continue
182                 
183                 print 'Requesting parent share %s from %s' % (p2pool.format_hash(share_hash), '%s:%i' % peer.addr)
184                 peer.send_getshares(
185                     hashes=[share_hash],
186                     parents=2000,
187                     stops=list(set(tracker.heads) | set(
188                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
189                     ))[:100],
190                 )
191                 requested[share_hash] = t, count + 1
192         
193         print 'Initializing work...'
194         yield set_real_work1()
195         set_real_work2()
196         print '    ...success!'
197         print
198         
199         start_time = time.time() - current_work2.value['clock_offset']
200         
201         # setup p2p logic and join p2pool network
202         
203         def share_share(share, ignore_peer=None):
204             for peer in p2p_node.peers.itervalues():
205                 if peer is ignore_peer:
206                     continue
207                 #if p2pool_init.DEBUG:
208                 #    print "Sending share %s to %r" % (p2pool.format_hash(share.hash), peer.addr)
209                 peer.send_shares([share])
210             share.flag_shared()
211         
212         def p2p_shares(shares, peer=None):
213             if len(shares) > 5:
214                 print 'Processing %i shares...' % (len(shares),)
215             
216             some_new = False
217             for share in shares:
218                 if share.hash in tracker.shares:
219                     #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool.format_hash(share.hash),)
220                     continue
221                 some_new = True
222                 
223                 #print 'Received share %s from %r' % (p2pool.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
224                 
225                 tracker.add(share)
226                 #for peer2, share_hash in desired:
227                 #    print 'Requesting parent share %x' % (share_hash,)
228                 #    peer2.send_getshares(hashes=[share_hash], parents=2000)
229                 
230                 if share.bitcoin_hash <= share.header['target']:
231                     print
232                     print 'GOT BLOCK! Passing to bitcoind! %s bitcoin: %x' % (p2pool.format_hash(share.hash), share.bitcoin_hash,)
233                     print
234                     if factory.conn.value is not None:
235                         factory.conn.value.send_block(block=share.as_block(tracker, args.net))
236                     else:
237                         print 'No bitcoind connection! Erp!'
238             
239             if shares and peer is not None:
240                 peer_heads.setdefault(shares[0].hash, set()).add(peer)
241             
242             if some_new:
243                 set_real_work2()
244             
245             if len(shares) > 5:
246                 print '... done processing %i shares. Have: %i/~%i' % (len(shares), len(tracker.shares), 2*args.net.CHAIN_LENGTH)
247         
248         def p2p_share_hashes(share_hashes, peer):
249             t = time.time()
250             get_hashes = []
251             for share_hash in share_hashes:
252                 if share_hash in tracker.shares:
253                     continue
254                 last_request_time, count = requested.get(share_hash, (None, 0))
255                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
256                     continue
257                 print 'Got share hash, requesting! Hash: %s' % (p2pool.format_hash(share_hash),)
258                 get_hashes.append(share_hash)
259                 requested[share_hash] = t, count + 1
260             
261             if share_hashes and peer is not None:
262                 peer_heads.setdefault(share_hashes[0], set()).add(peer)
263             if get_hashes:
264                 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
265         
266         def p2p_get_shares(share_hashes, parents, stops, peer):
267             parents = min(parents, 1000//len(share_hashes))
268             stops = set(stops)
269             shares = []
270             for share_hash in share_hashes:
271                 for share in itertools.islice(tracker.get_chain_known(share_hash), parents + 1):
272                     if share.hash in stops:
273                         break
274                     shares.append(share)
275             print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
276             peer.send_shares(shares, full=True)
277         
278         print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
279         
280         def parse(x):
281             if ':' in x:
282                 ip, port = x.split(':')
283                 return ip, int(port)
284             else:
285                 return x, args.net.P2P_PORT
286         
287         nodes = set([
288             ('72.14.191.28', args.net.P2P_PORT),
289             ('62.204.197.159', args.net.P2P_PORT),
290             ('142.58.248.28', args.net.P2P_PORT),
291             ('94.23.34.145', args.net.P2P_PORT),
292         ])
293         for host in [
294             'p2pool.forre.st',
295             'dabuttonfactory.com',
296         ]:
297             try:
298                 nodes.add(((yield reactor.resolve(host)), args.net.P2P_PORT))
299             except:
300                 log.err(None, 'Error resolving bootstrap node IP:')
301         
302         p2p_node = p2p.Node(
303             current_work=current_work,
304             port=args.p2pool_port,
305             net=args.net,
306             addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.NAME),
307             mode=0 if args.low_bandwidth else 1,
308             preferred_addrs=set(map(parse, args.p2pool_nodes)) | nodes,
309         )
310         p2p_node.handle_shares = p2p_shares
311         p2p_node.handle_share_hashes = p2p_share_hashes
312         p2p_node.handle_get_shares = p2p_get_shares
313         
314         p2p_node.start()
315         
316         # send share when the chain changes to their chain
317         def work_changed(new_work):
318             #print 'Work changed:', new_work
319             for share in tracker.get_chain_known(new_work['best_share_hash']):
320                 if share.shared:
321                     break
322                 share_share(share, share.peer)
323         current_work.changed.watch(work_changed)
324         
325         print '    ...success!'
326         print
327         
328         @defer.inlineCallbacks
329         def upnp_thread():
330             while True:
331                 try:
332                     is_lan, lan_ip = yield ipdiscover.get_local_ip()
333                     if is_lan:
334                         pm = yield portmapper.get_port_mapper()
335                         yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP') # XXX try to forward external correct port?
336                 except:
337                     if p2pool_init.DEBUG:
338                         log.err(None, "UPnP error:")
339                 yield deferral.sleep(random.expovariate(1/120))
340         
341         if args.upnp:
342             upnp_thread()
343         
344         # start listening for workers with a JSON-RPC server
345         
346         print 'Listening for workers on port %i...' % (args.worker_port,)
347         
348         # setup worker logic
349         
350         merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
351         run_identifier = struct.pack('<Q', random.randrange(2**64))
352         
353         share_counter = skiplists.CountsSkipList(tracker, run_identifier)
354         removed_unstales = set()
355         def get_share_counts():
356             height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
357             matching_in_chain = share_counter(current_work.value['best_share_hash'], height) | removed_unstales
358             shares_in_chain = my_shares & matching_in_chain
359             stale_shares = my_shares - matching_in_chain
360             return len(shares_in_chain) + len(stale_shares), len(stale_shares)
361         @tracker.verified.removed.watch
362         def _(share):
363             if share.hash in my_shares and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
364                 removed_unstales.add(share.hash)
365         
366         def compute(state, payout_script):
367             if payout_script is None:
368                 payout_script = my_script
369             if state['best_share_hash'] is None and args.net.PERSIST:
370                 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
371             if len(p2p_node.peers) == 0 and args.net.PERSIST:
372                 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
373             if time.time() > current_work2.value['last_update'] + 60:
374                 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
375             pre_extra_txs = [tx for tx in tx_pool.itervalues() if tx.is_good()]
376             pre_extra_txs = pre_extra_txs[:2**16 - 1] # merkle_branch limit
377             extra_txs = []
378             size = 0
379             for tx in pre_extra_txs:
380                 this_size = len(bitcoin.data.tx_type.pack(tx.tx))
381                 if size + this_size > 500000:
382                     break
383                 extra_txs.append(tx)
384                 size += this_size
385             # XXX check sigops!
386             # XXX assuming generate_tx is smallish here..
387             def get_stale_frac():
388                 shares, stale_shares = get_share_counts()
389                 if shares == 0:
390                     return ""
391                 frac = stale_shares/shares
392                 return 2*struct.pack('<H', int(65535*frac + .5))
393             subsidy = args.net.BITCOIN_SUBSIDY_FUNC(state['height']) + sum(tx.value_in - tx.value_out for tx in extra_txs)
394             generate_tx = p2pool.generate_transaction(
395                 tracker=tracker,
396                 previous_share_hash=state['best_share_hash'],
397                 new_script=payout_script,
398                 subsidy=subsidy,
399                 nonce=run_identifier + struct.pack('<Q', random.randrange(2**64)) + get_stale_frac(),
400                 block_target=state['target'],
401                 net=args.net,
402             )
403             print 'New work for worker! Difficulty: %.06f Payout if block: %.6f %s' % (0xffff*2**208/p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'], (generate_tx['tx_outs'][-1]['value']-subsidy//200)*1e-8, args.net.BITCOIN_SYMBOL)
404             #print 'Target: %x' % (p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'],)
405             #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
406             transactions = [generate_tx] + [tx.tx for tx in extra_txs]
407             merkle_root = bitcoin.data.merkle_hash(transactions)
408             merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
409             
410             timestamp = int(time.time() - current_work2.value['clock_offset'])
411             if state['best_share_hash'] is not None:
412                 timestamp2 = math.median((s.timestamp for s in itertools.islice(tracker.get_chain_to_root(state['best_share_hash']), 11)), use_float=False) + 1
413                 if timestamp2 > timestamp:
414                     print 'Toff', timestamp2 - timestamp
415                     timestamp = timestamp2
416             target2 = p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
417             times[p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['nonce']] = time.time()
418             #print 'SENT', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
419             return bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'], target2)
420         
421         my_shares = set()
422         times = {}
423         
424         def got_response(data, user):
425             try:
426                 # match up with transactions
427                 header = bitcoin.getwork.decode_data(data)
428                 transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
429                 if transactions is None:
430                     print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
431                     return False
432                 block = dict(header=header, txs=transactions)
433                 hash_ = bitcoin.data.block_header_type.hash256(block['header'])
434                 if hash_ <= block['header']['target'] or p2pool_init.DEBUG:
435                     if factory.conn.value is not None:
436                         factory.conn.value.send_block(block=block)
437                     else:
438                         print 'No bitcoind connection! Erp!'
439                     if hash_ <= block['header']['target']:
440                         print
441                         print 'GOT BLOCK! Passing to bitcoind! bitcoin: %x' % (hash_,)
442                         print
443                 target = p2pool.coinbase_type.unpack(transactions[0]['tx_ins'][0]['script'])['share_data']['target']
444                 if hash_ > target:
445                     print 'Worker submitted share with hash > target:\nhash  : %x\ntarget: %x' % (hash_, target)
446                     return False
447                 share = p2pool.Share.from_block(block)
448                 my_shares.add(share.hash)
449                 print 'GOT SHARE! %s %s prev %s age %.2fs' % (user, p2pool.format_hash(share.hash), p2pool.format_hash(share.previous_hash), time.time() - times[share.nonce]) + (' DEAD ON ARRIVAL' if share.previous_hash != current_work.value['best_share_hash'] else '')
450                 good = share.previous_hash == current_work.value['best_share_hash']
451                 # maybe revert back to tracker being non-blocking so 'good' can be more accurate?
452                 p2p_shares([share])
453                 # eg. good = share.hash == current_work.value['best_share_hash'] here
454                 return good
455             except:
456                 log.err(None, 'Error processing data received from worker:')
457                 return False
458         
459         web_root = worker_interface.WorkerInterface(current_work, compute, got_response, args.net)
460         
461         def get_rate():
462             if current_work.value['best_share_hash'] is not None:
463                 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
464                 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net, min(height, 720))
465                 return json.dumps(att_s)
466             return json.dumps(None)
467         
468         def get_users():
469             height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
470             weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 2**256)
471             res = {}
472             for script in sorted(weights, key=lambda s: weights[s]):
473                 res[bitcoin.data.script2_to_human(script, args.net)] = weights[script]/total_weight
474             return json.dumps(res)
475         
476         class WebInterface(resource.Resource):
477             def __init__(self, func, mime_type):
478                 self.func, self.mime_type = func, mime_type
479             
480             def render_GET(self, request):
481                 request.setHeader('Content-Type', self.mime_type)
482                 return self.func()
483         
484         web_root.putChild('rate', WebInterface(get_rate, 'application/json'))
485         web_root.putChild('users', WebInterface(get_users, 'application/json'))
486         if args.charts:
487             web_root.putChild('chain_img', WebInterface(lambda: draw.get(tracker, current_work.value['best_share_hash']), 'image/png'))
488         
489         reactor.listenTCP(args.worker_port, server.Site(web_root))
490         
491         print '    ...success!'
492         print
493         
494         # done!
495         
496         tx_pool = expiring_dict.ExpiringDict(600, get_touches=False) # hash -> tx
497         get_raw_transaction = deferral.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
498         
499         class Tx(object):
500             def __init__(self, tx, seen_at_block):
501                 self.hash = bitcoin.data.tx_type.hash256(tx)
502                 self.tx = tx
503                 self.seen_at_block = seen_at_block
504                 self.mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
505                 #print
506                 #print '%x %r' % (seen_at_block, tx)
507                 #for mention in self.mentions:
508                 #    print '%x' % mention
509                 #print
510                 self.parents_all_in_blocks = False
511                 self.value_in = 0
512                 #print self.tx
513                 self.value_out = sum(txout['value'] for txout in self.tx['tx_outs'])
514                 self._find_parents_in_blocks()
515             
516             @defer.inlineCallbacks
517             def _find_parents_in_blocks(self):
518                 for tx_in in self.tx['tx_ins']:
519                     try:
520                         raw_transaction = yield get_raw_transaction(tx_in['previous_output']['hash'])
521                     except Exception:
522                         return
523                     self.value_in += raw_transaction['tx']['txouts'][tx_in['previous_output']['index']]['value']
524                     #print raw_transaction
525                     if not raw_transaction['parent_blocks']:
526                         return
527                 self.parents_all_in_blocks = True
528             
529             def is_good(self):
530                 if not self.parents_all_in_blocks:
531                     return False
532                 x = self.is_good2()
533                 #print 'is_good:', x
534                 return x
535         
536         @defer.inlineCallbacks
537         def new_tx(tx_hash):
538             try:
539                 assert isinstance(tx_hash, (int, long))
540                 #print 'REQUESTING', tx_hash
541                 tx = yield (yield factory.getProtocol()).get_tx(tx_hash)
542                 #print 'GOT', tx
543                 tx_pool[bitcoin.data.tx_type.hash256(tx)] = Tx(tx, current_work.value['previous_block'])
544             except:
545                 log.err(None, 'Error handling tx:')
546         # disable for now, for testing impact on stales
547         #factory.new_tx.watch(new_tx)
548         
549         def new_block(block_hash):
550             work_updated.happened()
551         factory.new_block.watch(new_block)
552         
553         print 'Started successfully!'
554         print
555         
556         ht.updated.watch(set_real_work2)
557         
558         @defer.inlineCallbacks
559         def work1_thread():
560             while True:
561                 flag = work_updated.get_deferred()
562                 try:
563                     yield set_real_work1()
564                 except:
565                     log.err()
566                 yield defer.DeferredList([flag, deferral.sleep(random.uniform(1, 10))], fireOnOneCallback=True)
567         
568         @defer.inlineCallbacks
569         def work2_thread():
570             while True:
571                 try:
572                     set_real_work2()
573                 except:
574                     log.err()
575                 yield deferral.sleep(random.expovariate(1/20))
576         
577         work1_thread()
578         work2_thread()
579         
580         
581         if hasattr(signal, 'SIGALRM'):
582             def watchdog_handler(signum, frame):
583                 print 'Watchdog timer went off at:'
584                 traceback.print_stack()
585             
586             signal.signal(signal.SIGALRM, watchdog_handler)
587             task.LoopingCall(signal.alarm, 30).start(1)
588         
589         
590         def read_stale_frac(share):
591             if len(share.nonce) != 20:
592                 return None
593             a, b = struct.unpack("<HH", share.nonce[-4:])
594             if a != b:
595                 return None
596             return a/65535
597         
598         while True:
599             yield deferral.sleep(3)
600             try:
601                 if time.time() > current_work2.value['last_update'] + 60:
602                     print '''LOST CONTACT WITH BITCOIND for 60 seconds, check that it isn't frozen or dead'''
603                 if current_work.value['best_share_hash'] is not None:
604                     height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
605                     if height > 2:
606                         att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net, min(height - 1, 120))
607                         weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 2**100)
608                         shares, stale_shares = get_share_counts()
609                         print 'Pool: %sH/s in %i shares (%i/%i verified) Recent: %.02f%% >%sH/s Shares: %i (%i stale) Peers: %i' % (
610                             math.format(att_s),
611                             height,
612                             len(tracker.verified.shares),
613                             len(tracker.shares),
614                             weights.get(my_script, 0)/total_weight*100,
615                             math.format(weights.get(my_script, 0)/total_weight*att_s),
616                             shares,
617                             stale_shares,
618                             len(p2p_node.peers),
619                         ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool_init.DEBUG else '')
620                         fracs = [read_stale_frac(share) for share in itertools.islice(tracker.get_chain_known(current_work.value['best_share_hash']), 120) if read_stale_frac(share) is not None]
621                         if fracs:
622                             med = math.median(fracs)
623                             print 'Median stale proportion:', med
624                             if shares:
625                                 print '    Own:', stale_shares/shares
626                                 if med < .99:
627                                     print '    Own efficiency: %.02f%%' % (100*(1 - stale_shares/shares)/(1 - med),)
628                             
629                             
630             except:
631                 log.err()
632     except:
633         log.err(None, 'Fatal error:')
634     finally:
635         reactor.stop()
636
637 def run():
638     class FixedArgumentParser(argparse.ArgumentParser):
639         def _read_args_from_files(self, arg_strings):
640             # expand arguments referencing files
641             new_arg_strings = []
642             for arg_string in arg_strings:
643                 
644                 # for regular arguments, just add them back into the list
645                 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
646                     new_arg_strings.append(arg_string)
647                 
648                 # replace arguments referencing files with the file content
649                 else:
650                     try:
651                         args_file = open(arg_string[1:])
652                         try:
653                             arg_strings = []
654                             for arg_line in args_file.read().splitlines():
655                                 for arg in self.convert_arg_line_to_args(arg_line):
656                                     arg_strings.append(arg)
657                             arg_strings = self._read_args_from_files(arg_strings)
658                             new_arg_strings.extend(arg_strings)
659                         finally:
660                             args_file.close()
661                     except IOError:
662                         err = sys.exc_info()[1]
663                         self.error(str(err))
664             
665             # return the modified argument list
666             return new_arg_strings
667     
668     parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool_init.__version__,), fromfile_prefix_chars='@')
669     parser.convert_arg_line_to_args = lambda arg_line: (arg for arg in arg_line.split() if arg.strip())
670     parser.add_argument('--version', action='version', version=p2pool_init.__version__)
671     parser.add_argument('--net',
672         help='use specified network (default: bitcoin)',
673         action='store', choices=sorted(x for x in p2pool.nets if 'testnet' not in x), default='bitcoin', dest='net_name')
674     parser.add_argument('--testnet',
675         help='''use the network's testnet''',
676         action='store_const', const=True, default=False, dest='testnet')
677     parser.add_argument('--debug',
678         help='debugging mode',
679         action='store_const', const=True, default=False, dest='debug')
680     parser.add_argument('-a', '--address',
681         help='generate to this address (defaults to requesting one from bitcoind)',
682         type=str, action='store', default=None, dest='address')
683     parser.add_argument('--charts',
684         help='generate charts on the web interface (requires PIL and pygame)',
685         action='store_const', const=True, default=False, dest='charts')
686     parser.add_argument('--logfile',
687         help='''log to specific file (defaults to <network_name>.log in run_p2pool.py's directory)''',
688         type=str, action='store', default=None, dest='logfile')
689     
690     p2pool_group = parser.add_argument_group('p2pool interface')
691     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
692         help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
693         type=int, action='store', default=None, dest='p2pool_port')
694     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
695         help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
696         type=str, action='append', default=[], dest='p2pool_nodes')
697     parser.add_argument('-l', '--low-bandwidth',
698         help='trade lower bandwidth usage for higher latency (reduced efficiency)',
699         action='store_true', default=False, dest='low_bandwidth')
700     parser.add_argument('--disable-upnp',
701         help='''don't attempt to forward port 9333 (19333 for testnet) from the WAN to this computer using UPnP''',
702         action='store_false', default=True, dest='upnp')
703     
704     worker_group = parser.add_argument_group('worker interface')
705     worker_group.add_argument('-w', '--worker-port', metavar='PORT',
706         help='listen on PORT for RPC connections from miners asking for work and providing responses (default: bitcoin: 9332 namecoin: 9331 ixcoin: 9330 i0coin: 9329, +10000 for testnets)',
707         type=int, action='store', default=None, dest='worker_port')
708     
709     bitcoind_group = parser.add_argument_group('bitcoind interface')
710     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
711         help='connect to a bitcoind at this address (default: 127.0.0.1)',
712         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
713     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
714         help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332, 8338 for ixcoin)',
715         type=int, action='store', default=None, dest='bitcoind_rpc_port')
716     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
717         help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 normally. 18333 for testnet)',
718         type=int, action='store', default=None, dest='bitcoind_p2p_port')
719     
720     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSER',
721         help='bitcoind RPC interface username (default: empty)',
722         type=str, action='store', default='', nargs='?', dest='bitcoind_rpc_username')
723     bitcoind_group.add_argument(metavar='BITCOIND_RPCPASSWORD',
724         help='bitcoind RPC interface password',
725         type=str, action='store', dest='bitcoind_rpc_password')
726     
727     args = parser.parse_args()
728     
729     if args.debug:
730         p2pool_init.DEBUG = True
731     
732     if args.logfile is None:
733        args.logfile = os.path.join(os.path.dirname(sys.argv[0]), args.net_name + ('_testnet' if args.testnet else '') + '.log')
734     
735     class ReopeningFile(object):
736         def __init__(self, *open_args, **open_kwargs):
737             self.open_args, self.open_kwargs = open_args, open_kwargs
738             self.inner_file = open(*self.open_args, **self.open_kwargs)
739         def reopen(self):
740             self.inner_file.close()
741             self.inner_file = open(*self.open_args, **self.open_kwargs)
742         def write(self, data):
743             self.inner_file.write(data)
744         def flush(self):
745             self.inner_file.flush()
746     class TeePipe(object):
747         def __init__(self, outputs):
748             self.outputs = outputs
749         def write(self, data):
750             for output in self.outputs:
751                 output.write(data)
752         def flush(self):
753             for output in self.outputs:
754                 output.flush()
755     class TimestampingPipe(object):
756         def __init__(self, inner_file):
757             self.inner_file = inner_file
758             self.buf = ''
759             self.softspace = 0
760         def write(self, data):
761             buf = self.buf + data
762             lines = buf.split('\n')
763             for line in lines[:-1]:
764                 self.inner_file.write('%s %s\n' % (datetime.datetime.now().strftime("%H:%M:%S.%f"), line))
765                 self.inner_file.flush()
766             self.buf = lines[-1]
767         def flush(self):
768             pass
769     logfile = ReopeningFile(args.logfile, 'a')
770     sys.stdout = sys.stderr = log.DefaultObserver.stderr = TimestampingPipe(TeePipe([sys.stderr, logfile]))
771     if hasattr(signal, "SIGUSR1"):
772         def sigusr1(signum, frame):
773             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
774             logfile.reopen()
775             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
776         signal.signal(signal.SIGUSR1, sigusr1)
777     task.LoopingCall(logfile.reopen).start(5)
778     
779     args.net = p2pool.nets[args.net_name + ('_testnet' if args.testnet else '')]
780     
781     if args.bitcoind_rpc_port is None:
782         args.bitcoind_rpc_port = args.net.BITCOIN_RPC_PORT
783     
784     if args.bitcoind_p2p_port is None:
785         args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
786     
787     if args.p2pool_port is None:
788         args.p2pool_port = args.net.P2P_PORT
789     
790     if args.worker_port is None:
791         args.worker_port = args.net.WORKER_PORT
792     
793     if args.address is not None:
794         try:
795             args.pubkey_hash = bitcoin.data.address_to_pubkey_hash(args.address, args.net)
796         except Exception, e:
797             raise ValueError('error parsing address: ' + repr(e))
798     else:
799         args.pubkey_hash = None
800     
801     reactor.callWhenRunning(main, args)
802     reactor.run()